-
-
Notifications
You must be signed in to change notification settings - Fork 748
[WIP] MPI communication layer #4142
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Well this is fun to see :) cc @kmpaul and @andersy005 from dask-mpi , who might find this interesting Let's also cc @dalcinl from mpi4py to see if he has interest / time. I think that the point around waiting to recv from many senders simultaneously is an interesting design challenge. I agree in general that polling on each one probably isn't ideal. Ideally there would be some event in MPI itself that we could use to trigger things. MPI wasn't designed for a dynamic application like Dask, but I wouldn't be surprised if there was some internal system that we could hook into here. |
|
@ianthomas23 I'm also curious, was this done just for fun, or are you working on something in particular? I'm also curious to know if there are any performance differences. A computation that you might want to try is something like the following: from dask.distributed import Client
client = Client() # or however you set up
import dask.array as da
import time
x = da.random.random((20000, 20000)).persist()
start = time.time()
y = (x + x.T).transpose().sum().compute()
stop = time.time()
print(stop - start) |
|
@ianthomas23 This looks very cool! Thanks for sharing it. (And @mrocklin, thanks for CCing me.) I won't have time to look at the PR closely for a while, but I'll take a look and respond more next week. |
|
@mrocklin There is no particular problem that I am trying to solve, so I guess we are in "just for fun" territory! I was looking around dask for issues I could help with and when I saw there was some interest in using MPI, for which my skillset is ideally suited, I thought I would take a stab at it. |
distributed/comm/mpi.py
Outdated
| while True: | ||
| if self._cancel: | ||
| return None, None | ||
| if _mpi_comm.iprobe(source=self._source, tag=self._tag): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
status = MPI.Status()
if _mpi_comm.iprobe(source=self._source, tag=self._tag, status=status):
source = status.Get_source()
tag = status.Get_tag()
msg = _mpi_comm.recv(source=source, tag=tag, status=status)
return msg, status.Get_source()There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alternatively, if MPI.VERSION >= 3, you could use the following code, which is the thread-safe way of doing things:
message = _mpi_comm.iprobe(source=self._source, tag=self._tag, status=status)
if message is not None:
msg = message.recv(status=status)
return msg, status.Get_source()|
@mrocklin Polling many senders to receive a message is builtin in MPI, you just specify a |
|
Oh I see, great. So we're not polling on each Comm every 5ms, we're polling on all comms every 5ms. @dalcinl you mention thread safety. If mpi4py is threadsafe then another option here would be to set up a thread that just blocks on waiting for a message. Once it comes in it would alert the main asyncio event loop thread which would then respond. This might be more responsive than polling. Thoughts anyone? |
|
@mrocklin I do not know all the details of all the code in this PR. I'm just saying that in MPI you can very well block in a receive call polling many sources, it is a builtin feature of MPI. About thread safety, mpi4py is certainly thread-safe. However, the backend MPI implemenation maybe not, or users my request various levels of thread support. Besides that detail, given the way MPI implementations work (at least by default), blocking on a mpi4py provides @ianthomas23 Look in |
@mrocklin No, you were correct first time, each Comm (and Listener) is polling separately. At least this means there is plenty of room for improvement. |
|
@dalcinl Thanks for your suggestions on the improved |
|
@mrocklin Your example computation x = da.random.random((20000, 20000)).persist()
start = time.time()
y = (x + x.T).transpose().sum().compute()
print('Seconds:', time.time() - start)using 5 MPI processes (3 workers) on my 4-core i5 laptop takes ~2.2 s using the |
3a70f09 to
8ecb3d2
Compare
|
This is as polished as I am going to get it. Now just a single object (per Scheduler/Client/Worker) polls for incoming messages and passes them to the appropriate receiver. On a single multicore machine with 1 thread per worker (and all the debug stuff turned off) it is about 10% slower than TCP. Under these circumstances it is using shared memory. But a more realistic test across multiple nodes of an Infiniband cluster it is only about half the speed of TCP, so comms were dominating. I suspect that a problem could be selected to perform better than this, e.g. one that sends a smaller number of larger messages. But ultimately the small message 'chatter' between Scheduler/Client/Workers is slow due to the extra latency introduced by the MPI <-> dask communications. MPI isn't really designed to work the dask event-driven way. Also, all of the low level comms code in dask is an unnecessary overhead here as MPI deals with all of it, but removing it from the distributed MPI code path would be serious refactoring work. I note that I haven't touched this code for a few months and there have been changes to distributed in the mean time that may make a difference (either way!). It has been an interesting experiment, but has now dropped off the bottom of my priority list. If someone else wants to continue with it they are welcome to do so. |
This is an implementation of the communication layer using MPI, for demonstration purposes. It is far from production standard and much is suboptimal, but I wanted to make the code available for others to look at and discuss. There is a related issue dask/dask-mpi#48.
It comes with a demo to run in
demo/demo.py. You will need bothmpi4pyanddask-mpi, and you can start the demo from the command line using something likewhich will create 5 processes (1 Scheduler, 1 Client and 3 Workers). If you set
want_sleep = Truein the script it will sleep before and after the calculations to give you time to open the dashboard in a web browser. Much logging is performed; each MPI process logs to a different file, e.g.rank3.log, which has proved invaluable for debugging.The actual MPI code is minimal and it fits fairly well into the
Comm/Connector/Listener/Backendarchitecture except that it doesn't work well withasynciosuch that the handling of asynchronous sends and receives is achieved by regular polling, which is far from ideal.It is too early to talk about performance, but I can't help it. It is poor. And there is significant difference between
mpi4pyusing different MPI implementations, e.g. MPICH vs OpenMPI. But I am mostly concerned about it working correctly at this stage.Some improvements I have started to think about:
distributedutilities to convert between messages as python objects and byte streams rather that the (suspected slow)mpi4pyequivalents.Commbasis. Doing this once per MPI rank and passing it to the correctCommshould be much better.Certainly the latter requires an understanding of how the higher-level
Scheduler/Worker/Clientlayer interacts with the communications layer, and I will no doubt have some questions about this.Enjoy!