Skip to content

replace res with two kinds of output, accumulator and timeseries#55

Merged
kain88-de merged 8 commits intoMDAnalysis:masterfrom
VOD555:enh_prdf
Sep 18, 2018
Merged

replace res with two kinds of output, accumulator and timeseries#55
kain88-de merged 8 commits intoMDAnalysis:masterfrom
VOD555:enh_prdf

Conversation

@VOD555
Copy link
Copy Markdown
Collaborator

@VOD555 VOD555 commented Sep 6, 2018

Changes made in this Pull Request:

  • Replace res with two kinds of output: accumulator and timeseries.
    timeseries is the same as the old res, which just appends the result from each step.
    The other one, accumulator, accumulates the results so that we can reduce the size of the results of each parallelled task for function such as rdf.

The example for timeseries is contact.py, and the one for accumulator is rdf.

PR Checklist

  • Tests?
  • Docs?
  • CHANGELOG updated?

@kain88-de
Copy link
Copy Markdown
Member

kain88-de commented Sep 6, 2018

I do not understand the motivation for this change. In what respect is the size of results reduced? It looks to me like it would make it more complicated to write an analysis because one has to understand and decide if they want a timeseries or an accumulator.

Note I'm not against this change and I appreciate your contribution. I would simply like to understand it before I merge.

pmda/parallel.py Outdated
if accum == []:
accum = res_sf['accumulator']
else:
accum += res_sf['accumulator']
Copy link
Copy Markdown
Member

@kain88-de kain88-de Sep 6, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this if-else clause is not necessary accum.extend(res_sf) would do the same in both cases. I'm pretty sure just += would also work because you create the empty list first.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah ok, so I misinterpreted your intends on this line. You are using the empty list to as a zero or None value.

if accum == []:
    accum = res  # convert accum from an empty list to a numpy array
else:
   accum += res # here we add two numpy arrays

Is that correct?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's correct.

@VOD555
Copy link
Copy Markdown
Collaborator Author

VOD555 commented Sep 6, 2018

@kain88-de Currently, res stores the results from each step. It's ok for functions such as contacts and rms, whose final results are timeseries.
However, the final result of pmda.rdf is not tiemseries, if we want to get a 75-bins rdf for a system with 1000 frames using 4 cores, in each step, pmda.rdf will generate a 175 matrix, and self._results would be a 425075 matrix. But what we want is just a 175 matrix.
For a 'numpy.array' whose elements are also numpy.array, for example, 'a = np.array(np.array([b, c, d]), np.array([e]))', the addtion a + a will be np.array(np.array([2b, 2c, 2d]), np.array([2e])) but not a.append(a). Due to this property, in each parallelled block, accumulator will do matrix addtion, and it is always a 175 matrix. As a result, self._results would be a 475 matrix which is much smaller.

@orbeckst
Copy link
Copy Markdown
Member

orbeckst commented Sep 7, 2018

As @VOD555 says, there are two different ways in which one might want to collect data: as a timeseries or as a histogram (accumulator). In principle one could create the timeseries and then histogram in _conclude but this typically requires too much memory. RDF or a density calculation are examples of the latter. We haven't found a good way to use the existing PMDA framework for histograming, therefore this experimental PR.

If you have an alternative suggestion we're all ears – one reason for this PR is to discuss and prototype.

@kain88-de
Copy link
Copy Markdown
Member

kain88-de commented Sep 7, 2018

OK, I do understand the intent better now. You are saying we have the wrong reduce step in our map-reduce scheme. In the timeseries case we want to stack arrays together as the reduce step, or simply collecting all results in the right order. For the accumulation, we want to add arrays.

The current reduce step is res.append(computation). We would simply have to change this reduce function based on our cases.

timeseries: res = np.stack(res, computation)
accumulator: res = np.sum(res, computation)

To implement this the ParallelAnalysisBase class would get a new attribute. reduce = np.stack. This attribute can be overwritten in a child class like RDF to reduce = np.sum. Such an attribute would allow full control over the reduce-step in the analysis and requires no change in existing classes.
With this new attribute line

res.append(self._single_frame(ts, agroups))

would change to

res = self.reduce(res, self._single_frame(ts, agroups))

Note I'm not sure about the stack command. One should check this!
I'm sure I missed some details in implementing this but the general idea could work.

edit: updated code example to make it clearer

@orbeckst
Copy link
Copy Markdown
Member

orbeckst commented Sep 7, 2018

Kind of. I would say that conceptually we do the reduce of map-reduce in _conclude() when the individual results from the workers are combined. But we are also doing a reduction over the individual frames for each worker, and that is the main problem here. I am drawing the distinction because it is possible that the reduction in _conclude could be different from the one over frames in a single worker. For example, if we were to calculate densities d[i] in the workers then the final reduction would need to add them with weights, d = w[0]*d[0] + w[1]*d[1] + w[2]*d[2] + ... where w[i] = n_frames[i]/n_frames_total.

I like your idea of defining a reduce function. I would actually be explicit and add a new API method

class ParallelAnalysis():
   ...
   @staticmethod
   def _reduce(*args):
         """implement the reduction of a single frame"""
         # timeseries:
         return np.stack(args)

         # accumulator
         return np.sum(args)

and then use self._reduce() in the _dask_helper() function

res = self._reduce(res, self._single_frame(ts, agroups))

(I would make it a static method to make clear that it should only work on the input and not use anything in the class.)

If the _reduce() method works for _conclude() then you can also use it there directly:

def _conclude(self):
    self.result = self._reduce(*self._result)

@kain88-de
Copy link
Copy Markdown
Member

I would make it a static method to make clear that it should only work on the input and not use anything in the class.

Good idea!

I like your idea of defining a reduce function. I would actually be explicit and add a new API method

Yes being explicit is good. We should then better document the scheme we currently use for parallelization.

@richardjgowers
Copy link
Copy Markdown
Member

Maybe we just need to have a mixin which define what sort of reduce happens...

class SomethingOverTime(ParallelBase, AppendReduce):

class AverageThing(ParallelBase, MeanReduce):

@orbeckst
Copy link
Copy Markdown
Member

orbeckst commented Sep 7, 2018

Just to clarify: my idea was that when you write a class, you write your own

  • _single_frame()
  • _reduce()

because both can be rather specific.

We should have a default that maintains the old "timeseries" behavior. Maybe something like the following

class ParallelAnalysis():
   ...
   @staticmethod
   def _reduce(*args):
         """ 'append' action for a time series

        Note: all args *must* be of type list so that they create one bigger list.

        The alternative would be to do something like

            return args[0].extend(args[1:])
        
         """
         return sum(args)

(I think np.hstack is not flexible enough.)

@VOD555
Copy link
Copy Markdown
Collaborator Author

VOD555 commented Sep 7, 2018

What `np.stack()' does is to Join a sequence of arrays along a new axis. One example for it is

>>> a = np.array([1, 2, 3])
>>> b = np.array([2, 3, 4])
>>> np.stack((a, b))
array([[1, 2, 3],
       [2, 3, 4]])

The input should be one array, and np.stack() cannot deal with dictionary (such as the single_frame output of pmda.rdf). So I think np.stack() is not flexible enough.

list.append is still a good choice for us, and the default self._reduce() for 'timeseries' may be like

class ParallelAnalysis():
    ...
    def _reduce(self, res, result_single_frame):
        res.append(result_single_frame)
        return res

I've tested this, and it works well with the functions we currently have.

@kain88-de
Copy link
Copy Markdown
Member

Maybe we just need to have a mixin which define what sort of reduce happens...

Using staticmethods or inheritance through mixin classes achieves the same goal. Either way is OK. Personally I prefer to use a staticmethod because I can test them easily and having special one-off solutions are easier to implement.

For the staticmethod solution the standard should be append and we should over an accumulate utility function that can be used by others. In RDF this would be

@staticmethod
def _reduce(a, b):
    return parallel.accumulate(a, b)

The accumulate function can be what @VOD555 has currently written.

Using a staticmethod goes more in the direction of using composition instead of inheritance. There are lots of arguments on the internet available for composition and against it. I personally like it better.

pmda/contacts.py Outdated
y = y[0]
return y

def _reduce(self, res, result_single_frame):
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't be needed. It can take the default function from the base class.


def _reduce(self, res, result_single_frame):
""" 'add' action for an accumulator"""
if res == []:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The pythonic way is to declare the variable as None. My preferred way is to pre calculate the size of res and initialize it as res = np.zeros(shape). This way the if-else clause isn't needed.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your problem is that res is initialized in ParallelBase right?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I'm thinking about some way like

class ParallelAnalysis():
        ...
        def  _initial_res(shape):
                res = np.zeros(shape)
                return res
        ...
        self._res_settings = {'shape': shape}
        def _dask_helper():
                res = self._initial_res(**self._res_settings)
        ...

pmda/parallel.py Outdated
return np.asarray(res), np.asarray(times_io), np.asarray(
times_compute), b_universe.elapsed

def _reduce(self, res, result_single_frame):
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be a static function.

@codecov
Copy link
Copy Markdown

codecov bot commented Sep 11, 2018

Codecov Report

Merging #55 into master will decrease coverage by 1.79%.
The diff coverage is 100%.

Impacted file tree graph

@@            Coverage Diff            @@
##           master      #55     +/-   ##
=========================================
- Coverage   99.25%   97.46%   -1.8%     
=========================================
  Files           7        7             
  Lines         270      276      +6     
  Branches       28       27      -1     
=========================================
+ Hits          268      269      +1     
- Misses          1        4      +3     
- Partials        1        3      +2
Impacted Files Coverage Δ
pmda/rdf.py 90.9% <100%> (-9.1%) ⬇️
pmda/parallel.py 100% <100%> (ø) ⬆️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update a5aa576...06e6f7b. Read the comment docs.

@kain88-de
Copy link
Copy Markdown
Member

@VOD555 thanks for the fixes! Do you mind adding documentation here for the new _reduce static method and explain it's purpose, default and optional requirement for a class inheriting from ParallelAnalysisBase?

@VOD555
Copy link
Copy Markdown
Collaborator Author

VOD555 commented Sep 13, 2018

@VOD555 thanks for the fixes! Do you mind adding documentation here for the new _reduce static method and explain it's purpose, default and optional requirement for a class inheriting from ParallelAnalysisBase?

Sure!

Copy link
Copy Markdown
Member

@kain88-de kain88-de left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding the documentation. I have some suggestions to improve them further.

else:
# Add two numpy arrays
res += result_single_frame
return res
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding the documentation. Here is my suggestion for an updated version. I use your structure but try to make the text read more fluently.

               # NOT REQUIRED
               # Called for every frame. ``res`` contains all the results
               # before current time step, and ``result_single_frame`` is the
               # result of self._single_frame for the current time step. The
               # return value is the updated ``res``. The default is to append
               # results to a python list. This approach is sufficient for
               # time-series data.
               res.append(results_single_frame)
               # This is not suitable for every analysis. To add results over
               # multiple frames this function can be overwritten. The default
               # value for ``res`` is an empty list. Here we change the type to
               # the return type of `self._single_frame`. Afterwards we can
               # safely use addition to accumulate the results.
               if res == []:
                   res = result_single_frame
               else:
                   res += result_single_frame
               # If you overwrite this function *always* return the updated
               # ``res`` at the end.
               return res

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A reference to _reduce also needs to be added here

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To parallelize the analysis ``ParallelAnalysisBase`` separates the trajectory
into work blocks containing multiple frames. The number of blocks is equal to
the number of available cores or dask workers. This minimizes the number of
python processes that are started during a calculation. Accumulation of frames within a block happens in the `self._reduce` function.
 A consequence when
using dask is that adding additional workers during a computation will not
result in an reduction of run-time. 

I think it would be beneficial to add this text at the beginning of the class documentation to better explain why _reduce is needed.

@kain88-de
Copy link
Copy Markdown
Member

@VOD555 I made my suggested changes myself. You can have a last look over them before merging.

@VOD555
Copy link
Copy Markdown
Collaborator Author

VOD555 commented Sep 18, 2018

@kain88-de That's very good, thanks.

@kain88-de kain88-de merged commit 2ff3db2 into MDAnalysis:master Sep 18, 2018
@VOD555 VOD555 deleted the enh_prdf branch October 18, 2019 23:53
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants