[MXNET-331] NVLink communication pattern updated #8915
[MXNET-331] NVLink communication pattern updated #8915Laurawly wants to merge 8 commits intoapache:masterfrom
Conversation
| CopyFromTo(src[0], &buf.merged, priority); | ||
| return buf.merged; | ||
| CopyFromTo(src[0], &buf.merged, priority); | ||
| return buf.merged; |
There was a problem hiding this comment.
indentation need to be corrected.
| ElementwiseSum(reduce, &buf.merged); | ||
| CopyFromTo(stage.merged, &(buf.copy_buf[buf.copy_buf.size()-1]), priority); | ||
| reduce[reduce.size()-1] = buf.copy_buf[buf.copy_buf.size()-1]; | ||
| ElementwiseSum(reduce, &buf.merged); |
There was a problem hiding this comment.
Missing priority? Should this line be ElementwiseSum(reduce, &buf.merged, priority);?
| for (size_t i = 0; i < dst.size(); ++i) { | ||
| if (i != static_cast<size_t>(dev_id)) { | ||
| CopyFromTo(*dst[dev_id], dst[i], priority); | ||
| CopyFromTo(*dst[dev_id], (dst[i]), priority); |
There was a problem hiding this comment.
nit: why there is a bracket, (dst[i])
| // copy to a random device first | ||
| int dev_id = key % dst.size(); | ||
| CopyFromTo(src, dst[dev_id], priority); | ||
| CopyFromTo(src, (dst[dev_id]), priority); |
There was a problem hiding this comment.
nit: why there is a bracket, dst[dev_id]
|
|
||
| std::vector<Context> g1, g2; | ||
| for (auto& d : devs) { | ||
| if (d.dev_id < 4) g1.push_back(d); |
There was a problem hiding this comment.
Can we decide this by querying cuda instead of magic numbers?
|
Looks like this is not turned off when not using nvlink? |
| buf.copy_buf[i] = NDArray( | ||
| buf.merged.shape(), buf.merged.ctx(), false, buf.merged.dtype()); | ||
| } | ||
| if (buf.merged.is_none()&& stage.copy_buf.empty()) { |
There was a problem hiding this comment.
I don't think buf.merged.is_none() will ever be True since InitBuffersAndComm has intialized buf.merged?
| std::vector<NDArray> compressed_recv_buf; | ||
| }; | ||
| std::unordered_map<int, BufferEntry> merge_buf_; | ||
| std::unordered_map<int, BufferEntry> stage_buf_; |
There was a problem hiding this comment.
Please add brief description for what this is for
| stage.merged = NDArray(s, ctx, false, type); | ||
| ctx_info[ctx.dev_id].second += s.Size(); | ||
| } | ||
| } else { |
There was a problem hiding this comment.
What's the impact of this update on older devices/architectures?
|
@rahul003 please help review |
|
@piiswrong When not using NVLink, this method uses more PCIe express than QPI which also accelerates the original communications. |
|
@Laurawly Can we also make use of this feature for ReduceCompressed function? |
| return std::get<1>(a).Size() > std::get<1>(b).Size(); | ||
| }); | ||
|
|
||
| std::vector<Context> g1, g2; |
There was a problem hiding this comment.
Are there more readable var names for g1 and g2 to explain its purpose?
There was a problem hiding this comment.
So g represents group here. It means I separate the GPU cards into two communication groups.
There was a problem hiding this comment.
I've added some description for g1 and g2, and moved them to class members.
|
@rahul003 Yeah, I'll do an update on ReduceCompressed function as well. Thanks for reminding that. |
|
@rahul003 Could you review my updates in ReduceCompressed, thanks in advance! |
|
@rahul003 ping |
067b612 to
e10997d
Compare
|
Any idea why test_rsp_pull failed? |
| pinned_ctx_ = Context::CPUPinned(0); | ||
| } | ||
| virtual ~Comm() { } | ||
| Comm() { pinned_ctx_ = Context::CPUPinned(0); } |
There was a problem hiding this comment.
Why don't we initialize on the constructor initialization list? It's more efficient.
| int key, const NDArray& src, | ||
| const std::vector<NDArray*> dst, int priority) = 0; | ||
| virtual void Broadcast(int key, const NDArray& src, | ||
| const std::vector<NDArray*> dst, int priority) = 0; |
There was a problem hiding this comment.
vector is passed by value, shouldn't it be passed by ref?
| auto& buf = merge_buf_[key]; | ||
| std::vector<NDArray> reduce(src.size()); | ||
| if (buf.copy_buf.empty()) { | ||
| auto& stage = stage_buf_[key]; |
There was a problem hiding this comment.
I would type the full type here for readability.
|
|
||
| void Broadcast(int key, const NDArray& src, | ||
| const std::vector<NDArray*> dst, int priority) override { | ||
| void Broadcast(int key, const NDArray& src, const std::vector<NDArray*> dst, |
|
Why is this file such a big header and no impl? |
|
@rahul003 can you review the changes made for grad compression? Thanks! |
rahul003
left a comment
There was a problem hiding this comment.
Thanks for modifying reduceCompressed too.
Could you please add a few comments to reduce or reduceCompressed explaining the flow of data? That would make this easier for others to maintain or develop further.
| } | ||
|
|
||
| /// \brief the NVLinked connected gpu groups | ||
| std::vector<Context> g1, g2; |
There was a problem hiding this comment.
It might be more readable to expand the names of these variables?
| } | ||
| } else { | ||
| // QPI connections are included: use spanning tree | ||
| size_t gpu0, gpu1; |
There was a problem hiding this comment.
Could you add some comments on what the below computation is doing, what does gpu0 and gpu1 hold?
| int id = src[i].ctx().dev_id; | ||
| if ((!buf.merged.is_none() && id == stage.merged.ctx().dev_id) || | ||
| (buf.merged.is_none() && i == 0)) { | ||
| CopyFromTo(src[i], &(stage.merged), priority); |
There was a problem hiding this comment.
Why do we have to copy src[i] onto the same ctx? Can we directly use src[i]
| buf.copy_buf.resize(g1.size() + 1); | ||
| buf.compressed_recv_buf.resize(g1.size() + 1); | ||
| buf.compressed_send_buf.resize(g1.size() + 1); | ||
| buf.residual.resize(g1.size() + 1); |
There was a problem hiding this comment.
We are declaring g1.size()+1 as size of array for residuals. Residuals are not sent to other GPUs. We don't need to allocate one extra residual array
There was a problem hiding this comment.
That extra array is for copying back reduced value from stage buffer
There was a problem hiding this comment.
But is the residual array only remains on the original GPU. It is never sent anywhere, but is only updated in place. Or are you just declaring an extra array for residual so that you can index this array similar to the other arrays (copy_buf or compressed_recv_buf)?
Either way, we can avoid creation of an extra residual array, right?
That would be significant memory savings (equal to the parameters of the model).
There was a problem hiding this comment.
Oh, I see what you mean. Yeah, that's right. I'll correct it accordingly.
There was a problem hiding this comment.
Tested tests/nightly/test_kvstore.py and it passes.
|
Could you please do these three things,
|
|
@eric-haibin-lin Could you take a look if test_rsp_pull passes now? |
|
No, it is still failing. |
7332be1 to
d3aeed5
Compare
|
@piiswrong ping. |
| int mask = src.ctx().dev_mask(); | ||
| if (mask == Context::kCPU) { | ||
| for (auto d : dst) CopyFromTo(src, d, priority); | ||
| for (auto& d : dst) CopyFromTo(src, d, priority); |
There was a problem hiding this comment.
Is broadcast not leveraging the new comm pattern? The same logic can be applied to fully utilize the bandwidth during copy, right? Or is the plan to do that in the next PR?
| rctx.get_stream<gpu>()->Wait(); | ||
| break; | ||
| } | ||
| case gpu::kDevMask: { |
piiswrong
left a comment
There was a problem hiding this comment.
Two general points
- Too many irrelevant cosmetic changes
- the magic number 4 appears a lot of times. Are you assuming there are 4 gpus? This should be queried dynamically instead of being constant.
| */ | ||
| #ifndef MXNET_KVSTORE_COMM_H_ | ||
| #define MXNET_KVSTORE_COMM_H_ | ||
| #define NVLINK_SUPPORT 4 |
There was a problem hiding this comment.
what's this? Can we avoid magic numbers?
| on_complete(); | ||
| }, Context::CPU(), {src.var(), row_id.var()}, {out_cpu.var()}, | ||
| FnProperty::kNormal, priority, PROFILER_MESSAGE("KVStoreSparseRetain")); | ||
| [=](RunContext rctx, Engine::CallbackOnComplete on_complete) { |
There was a problem hiding this comment.
this sort of cosmetic changes are really distracting for code review. Try not to do it next time
| reduce[0] = buf.merged; | ||
|
|
||
| if (buf.copy_buf.empty()) { | ||
| // TODO(mli) this results in large device memory usage for huge ndarray, |
There was a problem hiding this comment.
Is this TODO handled by this PR?
| inline static void ReduceSumCPU( | ||
| const std::vector<DType*> &dptr, size_t offset, index_t size) { | ||
| template <typename DType> | ||
| inline static void ReduceSumCPU(const std::vector<DType*>& dptr, |
There was a problem hiding this comment.
This changes are really annoying
| reduce_s.resize(stage.copy_buf.size()); | ||
| for (size_t i = 0, j = 0; i < src.size(); ++i) { | ||
| int id = src[i].ctx().dev_id; | ||
| if (id >= 4 || buf.merged.is_none()) { |
There was a problem hiding this comment.
why 4? can we avoid magic numbers?
|
Hi, the community has passed to vote about associating the code changes with JIRA (https://lists.apache.org/thread.html/ab22cf0e35f1bce2c3bf3bec2bc5b85a9583a3fe7fd56ba1bbade55f@%3Cdev.mxnet.apache.org%3E) We have updated the guidelines for contributors in https://cwiki.apache.org/confluence/display/MXNET/Development+Process, please ensure that you have created a JIRA at https://issues.apache.org/jira/projects/MXNET/issues/ to describe your work in this pull request and include the JIRA title in your PR as [MXNET-xxxx] your title where MXNET-xxxx is the JIRA id Thanks! |
|
when can we expect this to be merged? |
|
Closing this for now due to inactivity. |
|
Moved to #11357 |
Description
(Optimized kvstore communication pattern to make full use of NVLinks)
Checklist
Essentials
make lint)Changes
Comments