Implement parallel replaygain analysis#3478
Conversation
* Add `--jobs` or `-j` to `replaygain`-> set the pool size * Single-threaded execution by default, if `--jobs` is unset * If multithreaded, calls `Backend.compute_album_gain` or `Backend.compute_track_gain` asynchronously with metadata storing/writing in the callback
sampsyo
left a comment
There was a problem hiding this comment.
Awesome; thanks for tackling this!!
I have a few comments in here about how we can reduce some code duplication by introducing a "maybe run in parallel" utility.
Could I ask you to please also add to the documentation for the plugin (and add a changelog entry)?
beetsplug/replaygain.py
Outdated
| self._log.debug(u'done analyzing {0}', item) | ||
|
|
||
| try: | ||
| if hasattr(self, 'pool'): |
There was a problem hiding this comment.
Perhaps we could simplify these conditionals by introducing a utility like _apply that takes an optional pool argument? If the pool is None, then this utility would just call the function directly. Otherwise, it would use a proper call to run the function asynchronously. Then we can centralize the "maybe do this parallel" logic rather than replicating it for the individual calls.
There was a problem hiding this comment.
Yeah, that would be much better. Thanks!
beetsplug/replaygain.py
Outdated
| def func(lib, opts, args): | ||
| write = ui.should_write(opts.write) | ||
| force = opts.force | ||
| jobs = opts.jobs |
There was a problem hiding this comment.
Instead of defaulting to off when jobs = 0, let's use our cpu_count utility:
Line 760 in c1937e1
There was a problem hiding this comment.
Should we do something like default = math.ceil(cpu_count() / 4) to keep CPU usage reasonable by default for large queries?
There was a problem hiding this comment.
And maybe have it default to no parallelism if configured with threaded: no?
There was a problem hiding this comment.
Maybe the best policy would be to match the behavior of the convert plugin:
https://github.com/beetbox/beets/blob/master/beetsplug/convert.py#L119
The option is named threads there, and it does default to the CPU count. I think it's probably OK to default to that? If people want to run in the background without disruption, it's easy to turn down the count…
And change local function `func` to `ReplayGainPlugin` method `replaygain_func` so that `self` is passed explicitly
|
Ok, so I remembered why I made it default to non-parallel :) While it does work with my on-disk library, it fails in The Travis build fails because I'm not propagating exceptions to the main thread. Apparently it's a common issue, didn't know that... |
a24baa5 to
6394d3e
Compare
6394d3e to
0fede91
Compare
* With `bs1770gain` installed the `Bs1770gainBackend` tests fail, but this should be fixed by beetbox#3480.
sampsyo
left a comment
There was a problem hiding this comment.
Looking good! A couple more comments within.
beetsplug/replaygain.py
Outdated
| write = ui.should_write(opts.write) | ||
| force = opts.force | ||
|
|
||
| if opts.threads != 0: |
There was a problem hiding this comment.
It seems like this condition might be supposed to be != 1? Or better yet, the condition should come after checking the config file too?
There was a problem hiding this comment.
- I wanted to include the option to completely bypass
ThreadPoolfrom the CLI with--threads 0, seems like a useful feature down the line in case we ever suspect the parallel processing is messing something up ThreadPool(1)still provides one worker thread that can work asynchronously which may be useful in some cases? I don’t know.
e41feb9 to
b39df01
Compare
|
Looking pretty good! I assume we should probably figure out the exception logging thing before merging? |
* ExceptionWatcher instance running in parallel to the pool, monitoring a queue for exceptions * Pooled threads push exceptions to this queue, log non-fatal exceptions * Application exits on fatal exception in pooled thread * More front end info logs in the CLI
02600ce to
9bd7842
Compare
|
Added an exception handling thread.
|
6d9cec9 to
49c7100
Compare
49c7100 to
9b283be
Compare
|
Pinging @sampsyo in case he didn't see the last request for review. Did you have anything else to add to this? |
7b4918d to
50757d3
Compare
|
Fixed the conflicts on my end. Some tests are failing though, fixing those now. |
beetsplug/replaygain.py
Outdated
| def _store(self, item): | ||
| try: | ||
| item.store() | ||
| except OperationalError: | ||
| # test_replaygain.py :memory: library can fail with | ||
| # `sqlite3.OperationalError: no such table: items` | ||
| # but the second attempt succeeds | ||
| item.store() |
There was a problem hiding this comment.
As far as I can tell, the exception is never thrown when working with an 'actual' database.
|
Is this still relevant? If so, what is blocking it? Is there anything you can do to help move it forward? This pull request has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions. |
|
So sorry for the delay on this. Everything looks awesome, for the most part—I have no specific code-level comments on the actual parallelization. However, I do think we need to do something about the |
|
@sampsyo No problem, I know you've been super busy :) Yeah, that would be way more sensible. I'll look into it! |
d512506 to
e3205aa
Compare
There was a problem hiding this comment.
I've pulled the sqlite3.OperationalError thing into test_replaygain.py as suggested.
Somewhat annoyingly this whole workaround is not "necessary" for the CI builds since they just skip all of the real tests...
I double-checked with the master branch and it's this PR that's causing this, for sure.
How do you think I should proceed?
- Leave it like this and submit an issue (which you could assign to me so I could keep looking into it)
- Look into it before going forward with this PR
| in test_replaygain.py, so that it can still fail appropriately | ||
| outside of these tests. | ||
| """ | ||
| item.store() |
There was a problem hiding this comment.
Removed the OperationalError handler, added an explanation as to why this seemingly useless method is there
| item.store() | ||
|
|
||
|
|
||
| @patch.object(ReplayGainPlugin, '_store', _store_retry_once) |
There was a problem hiding this comment.
As far as I can see there's no way to try/except on a higher level; the "false alarm" happens at every item, so it looks like we still need to ignore it for every item...
|
Awesome. This is really mysterious, but I like your temporary workaround to make the tests go through. Since you're comfortable with it, I'd like to forge ahead and merge this. Would you mind opening a new issue to get to the bottom of what's going on? Many thanks once again!! |
|
This seems to give me problems: replaygain metadata doesn't get written to file on import. If I import an album with |
|
@skapazzo could you please open a new issue for this? |
Sure, I'll do it straight away. |
The parallelism strategy in #3478, in retrospect, used a pretty funky way to deal with exceptions in the asynchronous work---since `apply_async` has an `error_callback` parameter that's meant for exactly this. The problem is that the wrapped function would correctly log the exception *and then return `None`*, confusing any downstream code. Instead of just adding `None`-awareness to the callback, let's just avoid running the callback altogether in the case of an error.
The parallelism strategy in beetbox#3478, in retrospect, used a pretty funky way to deal with exceptions in the asynchronous work---since `apply_async` has an `error_callback` parameter that's meant for exactly this. The problem is that the wrapped function would correctly log the exception *and then return `None`*, confusing any downstream code. Instead of just adding `None`-awareness to the callback, let's just avoid running the callback altogether in the case of an error.
Original PR #3478 by ybnd Original: beetbox/beets#3478
Merged from original PR #3478 Original: beetbox/beets#3478
Implements #2224
Add
--jobsor-jtoreplaygainto set the pool sizeSingle-threaded execution by default, if
--jobsis unsetIf multithreaded, calls
Backend.compute_album_gainorBackend.compute_track_gainasynchronously with metadata storing/writing in the callback