From 5009b133377b6f76f7ae21f7c082f43eade12e80 Mon Sep 17 00:00:00 2001 From: Leyuan Wang Date: Thu, 25 Jan 2018 20:56:12 +0000 Subject: [PATCH 1/8] merge with master --- src/kvstore/comm.h | 586 +++++++++++++++++++++++++++------------------ 1 file changed, 359 insertions(+), 227 deletions(-) diff --git a/src/kvstore/comm.h b/src/kvstore/comm.h index da2d03d519f4..4c8d1ac4a4c2 100644 --- a/src/kvstore/comm.h +++ b/src/kvstore/comm.h @@ -23,18 +23,18 @@ #ifndef MXNET_KVSTORE_COMM_H_ #define MXNET_KVSTORE_COMM_H_ #include -#include #include -#include #include -#include -#include +#include #include -#include "mxnet/ndarray.h" -#include "gradient_compression.h" +#include +#include +#include #include "../ndarray/ndarray_function.h" #include "../operator/tensor/sparse_retain-inl.h" #include "./kvstore_utils.h" +#include "gradient_compression.h" +#include "mxnet/ndarray.h" namespace mxnet { namespace kvstore { /** @@ -42,10 +42,8 @@ namespace kvstore { */ class Comm { public: - Comm() { - pinned_ctx_ = Context::CPUPinned(0); - } - virtual ~Comm() { } + Comm() { pinned_ctx_ = Context::CPUPinned(0); } + virtual ~Comm() {} /** * \brief init key with the data shape and storage shape */ @@ -54,33 +52,32 @@ class Comm { /** * \brief returns src[0] + .. + src[src.size()-1] */ - virtual const NDArray& Reduce( - int key, const std::vector& src, int priority) = 0; + virtual const NDArray& Reduce(int key, const std::vector& src, + int priority) = 0; /** * \brief copy from src to dst[i] for every i */ - virtual void Broadcast( - int key, const NDArray& src, - const std::vector dst, int priority) = 0; + virtual void Broadcast(int key, const NDArray& src, + const std::vector dst, int priority) = 0; /** * \brief broadcast src to dst[i] with target row_ids for every i - * \param dst a list of destination row_sparse NDArray and its target row_ids to broadcast, + * \param dst a list of destination row_sparse NDArray and its target row_ids + to broadcast, where the row_ids are expected to be unique and sorted - * \param use_copy if set to true, directly copy src to dst[i] without looking up the + * \param use_copy if set to true, directly copy src to dst[i] without looking + up the provided row_ids */ - virtual void BroadcastRowSparse(int key, const NDArray& src, - const std::vector>& dst, - const bool use_copy, - const int priority) = 0; + virtual void BroadcastRowSparse( + int key, const NDArray& src, + const std::vector>& dst, const bool use_copy, + const int priority) = 0; /** * \brief return a pinned contex */ - Context pinned_ctx() const { - return pinned_ctx_; - } + Context pinned_ctx() const { return pinned_ctx_; } /** * \brief Sets gradient compression parameters to be able to @@ -108,7 +105,7 @@ class CommCPU : public Comm { // TODO(junwu) delete the following data member, now for benchmark only is_serial_push_ = dmlc::GetEnv("MXNET_KVSTORE_SERIAL_PUSH", 0); } - virtual ~CommCPU() { } + virtual ~CommCPU() {} void Init(int key, const NDArrayStorageType stype, const TShape& shape, int type = mshadow::kFloat32) override { @@ -140,25 +137,28 @@ class CommCPU : public Comm { reduce[0] = buf.merged; if (buf.copy_buf.empty()) { - buf.copy_buf.resize(src.size()-1); + buf.copy_buf.resize(src.size() - 1); for (size_t j = 0; j < src.size() - 1; ++j) { // allocate NDArray based on storage type - buf.copy_buf[j] = NDArray( - src[0].shape(), pinned_ctx_, false, src[0].dtype()); + buf.copy_buf[j] = + NDArray(src[0].shape(), pinned_ctx_, false, src[0].dtype()); } } for (size_t i = 1; i < src.size(); ++i) { - CopyFromTo(src[i], &(buf.copy_buf[i-1]), priority); - reduce[i] = buf.copy_buf[i-1]; - const_vars[i-1] = reduce[i].var(); + CopyFromTo(src[i], &(buf.copy_buf[i - 1]), priority); + reduce[i] = buf.copy_buf[i - 1]; + const_vars[i - 1] = reduce[i].var(); } Engine::Get()->PushAsync( - [reduce, this](RunContext rctx, Engine::CallbackOnComplete on_complete) { - ReduceSumCPU(reduce); - on_complete(); - }, Context::CPU(), const_vars, {reduce[0].var()}, - FnProperty::kCPUPrioritized, priority, PROFILER_MESSAGE("KVStoreReduce")); + [reduce, this](RunContext rctx, + Engine::CallbackOnComplete on_complete) { + ReduceSumCPU(reduce); + on_complete(); + }, + Context::CPU(), const_vars, {reduce[0].var()}, + FnProperty::kCPUPrioritized, priority, + PROFILER_MESSAGE("KVStoreReduce")); } else { // buf.merged is a sparse ndarray. @@ -168,8 +168,8 @@ class CommCPU : public Comm { if (buf.copy_buf.empty()) { buf.copy_buf.resize(src.size()); for (size_t j = 0; j < src.size(); ++j) { - buf.copy_buf[j] = NDArray( - src[0].storage_type(), src[0].shape(), pinned_ctx_, true, src[0].dtype()); + buf.copy_buf[j] = NDArray(src[0].storage_type(), src[0].shape(), + pinned_ctx_, true, src[0].dtype()); } } for (size_t i = 0; i < src.size(); ++i) { @@ -178,44 +178,46 @@ class CommCPU : public Comm { const_vars[i] = reduce[i].var(); } NDArray result = buf.merged; - Resource rsc = ResourceManager::Get()->Request(result.ctx(), - ResourceRequest(ResourceRequest::kTempSpace)); + Resource rsc = ResourceManager::Get()->Request( + result.ctx(), ResourceRequest(ResourceRequest::kTempSpace)); Engine::Get()->PushAsync( - [reduce, result, rsc, this](RunContext rctx, Engine::CallbackOnComplete on_complete) { - NDArray out = result; - is_serial_push_? - ReduceSumCPUExSerial(reduce, &out) - : mxnet::ndarray::ElementwiseSum(rctx.get_stream(), rsc, reduce, &out); - on_complete(); - }, Context::CPU(), const_vars, {result.var(), rsc.var}, - FnProperty::kCPUPrioritized, priority, PROFILER_MESSAGE("KVStoreReduce")); + [reduce, result, rsc, this](RunContext rctx, + Engine::CallbackOnComplete on_complete) { + NDArray out = result; + is_serial_push_ ? ReduceSumCPUExSerial(reduce, &out) + : mxnet::ndarray::ElementwiseSum( + rctx.get_stream(), rsc, reduce, &out); + on_complete(); + }, + Context::CPU(), const_vars, {result.var(), rsc.var}, + FnProperty::kCPUPrioritized, priority, + PROFILER_MESSAGE("KVStoreReduce")); } return buf.merged; } - void Broadcast(int key, const NDArray& src, - const std::vector dst, int priority) override { + void Broadcast(int key, const NDArray& src, const std::vector dst, + int priority) override { int mask = src.ctx().dev_mask(); if (mask == Context::kCPU) { - for (auto d : dst) CopyFromTo(src, d, priority); + for (auto& d : dst) CopyFromTo(src, d, priority); } else { // first copy data to cpu, then broadcast auto& buf = merge_buf_[key]; CopyFromTo(src, &buf.merged, priority); - for (auto d : dst) CopyFromTo(buf.merged, d, priority); + for (auto& d : dst) CopyFromTo(buf.merged, d, priority); } } void BroadcastRowSparse(int key, const NDArray& src, const std::vector>& dst, - const bool use_copy, - const int priority) override { + const bool use_copy, const int priority) override { using namespace mshadow; CHECK_EQ(src.storage_type(), kRowSparseStorage) - << "BroadcastRowSparse expects row-sparse src NDArray"; + << "BroadcastRowSparse expects row-sparse src NDArray"; CHECK_EQ(src.ctx().dev_mask(), Context::kCPU) - << "BroadcastRowSparse with src on gpu context not supported"; + << "BroadcastRowSparse with src on gpu context not supported"; for (size_t i = 0; i < dst.size(); ++i) { NDArray* out = dst[i].first; NDArray row_id = dst[i].second; @@ -223,40 +225,47 @@ class CommCPU : public Comm { CopyFromTo(src, out, priority); } else { CHECK_EQ(out->storage_type(), kRowSparseStorage) - << "BroadcastRowSparse expects row_sparse dst NDArray"; + << "BroadcastRowSparse expects row_sparse dst NDArray"; CHECK_EQ(row_id.ctx().dev_mask(), Context::kCPU) - << "BroadcastRowSparse with row_indices on gpu context not supported"; + << "BroadcastRowSparse with row_indices on gpu context not " + "supported"; // retain according to unique indices - const bool use_sparse_retain = (src.shape()[0] != src.storage_shape()[0]) - || (row_id.dtype() != out->aux_type(rowsparse::kIdx)) - || (out->ctx().dev_mask() != Context::kGPU); + const bool use_sparse_retain = + (src.shape()[0] != src.storage_shape()[0]) || + (row_id.dtype() != out->aux_type(rowsparse::kIdx)) || + (out->ctx().dev_mask() != Context::kGPU); if (use_sparse_retain) { // use sparse_retain op const bool is_to_gpu = out->ctx().dev_mask() == Context::kGPU; - NDArray out_cpu = is_to_gpu? NDArray(kRowSparseStorage, src.shape(), - src.ctx(), true, src.dtype(), src.aux_types()) : *out; + NDArray out_cpu = + is_to_gpu ? NDArray(kRowSparseStorage, src.shape(), src.ctx(), + true, src.dtype(), src.aux_types()) + : *out; Engine::Get()->PushAsync( - [=](RunContext rctx, Engine::CallbackOnComplete on_complete) { - const TBlob& indices = row_id.data(); - NDArray temp = out_cpu; // get rid of const qualifier - op::SparseRetainOpForwardRspImpl(rctx.get_stream(), - src, indices, kWriteTo, - &temp); - on_complete(); - }, Context::CPU(), {src.var(), row_id.var()}, {out_cpu.var()}, - FnProperty::kNormal, priority, PROFILER_MESSAGE("KVStoreSparseRetain")); + [=](RunContext rctx, Engine::CallbackOnComplete on_complete) { + const TBlob& indices = row_id.data(); + NDArray temp = out_cpu; // get rid of const qualifier + op::SparseRetainOpForwardRspImpl( + rctx.get_stream(), src, indices, kWriteTo, &temp); + on_complete(); + }, + Context::CPU(), {src.var(), row_id.var()}, {out_cpu.var()}, + FnProperty::kNormal, priority, + PROFILER_MESSAGE("KVStoreSparseRetain")); if (is_to_gpu) { CopyFromTo(out_cpu, out, priority); } } else { // direct copy rows Engine::Get()->PushAsync( - [=](RunContext rctx, Engine::CallbackOnComplete on_complete) { - CopyRetainedRowsToGPU(rctx.get_stream(), rctx.get_stream(), - src, row_id, out); - // wait for GPU operations to complete - rctx.get_stream()->Wait(); - on_complete(); - }, out->ctx(), {src.var(), row_id.var()}, {out->var()}, - FnProperty::kCopyToGPU, priority, PROFILER_MESSAGE("KVStoreCopyRetainedRowsToGPU")); + [=](RunContext rctx, Engine::CallbackOnComplete on_complete) { + CopyRetainedRowsToGPU(rctx.get_stream(), + rctx.get_stream(), src, row_id, out); + // wait for GPU operations to complete + rctx.get_stream()->Wait(); + on_complete(); + }, + out->ctx(), {src.var(), row_id.var()}, {out->var()}, + FnProperty::kCopyToGPU, priority, + PROFILER_MESSAGE("KVStoreCopyRetainedRowsToGPU")); } } } @@ -270,22 +279,22 @@ class CommCPU : public Comm { */ void CopyRetainedRowsToGPU(mshadow::Stream* cpu_stream, mshadow::Stream* gpu_stream, - const NDArray& src, - const NDArray& indices, + const NDArray& src, const NDArray& indices, NDArray* dst) { #if MXNET_USE_CUDA == 1 CHECK_EQ(src.storage_type(), kRowSparseStorage) - << "CopyRetainedRowsToGPU expects row-sparse src NDArray"; + << "CopyRetainedRowsToGPU expects row-sparse src NDArray"; CHECK_EQ(src.ctx().dev_mask(), Context::kCPU) - << "CopyRetainedRowsToGPU with src on gpu context not supported"; + << "CopyRetainedRowsToGPU with src on gpu context not supported"; CHECK_EQ(src.storage_shape()[0], src.shape()[0]) - << "CopyRetainedRowsToGPU only supports src rsp with full rows"; + << "CopyRetainedRowsToGPU only supports src rsp with full rows"; CHECK_EQ(indices.storage_type(), kDefaultStorage); CHECK_EQ(indices.ctx().dev_mask(), Context::kCPU); CHECK_EQ(dst->storage_type(), kRowSparseStorage); CHECK_EQ(dst->ctx().dev_mask(), Context::kGPU); CHECK_EQ(indices.dtype(), dst->aux_type(rowsparse::kIdx)) - << "CopyRetainedRowsToGPU only supports same data type for idx array and dst aux_data(0)"; + << "CopyRetainedRowsToGPU only supports same data type for idx array " + "and dst aux_data(0)"; if (!src.storage_initialized() || indices.data().Size() == 0U) { op::FillZerosRspImpl(gpu_stream, *dst); return; @@ -299,29 +308,33 @@ class CommCPU : public Comm { dst->CheckAndAlloc({Shape1(num_rows_retained)}); TBlob dst_data = dst->data(); TBlob dst_idx_data = dst->aux_data(rowsparse::kIdx); - MSHADOW_TYPE_SWITCH(src.dtype(), DType, { - MSHADOW_IDX_TYPE_SWITCH(indices.dtype(), IType, { - // copy idx array - Tensor dst_idx_tensor = dst_idx_data.FlatTo1D(gpu_stream); - const Tensor idx_tensor = idx_data.FlatTo1D(cpu_stream); - Copy(dst_idx_tensor, idx_tensor, gpu_stream); - // copy src data - const Tensor src_data_tensor = src_data.get_with_shape( - Shape2(src_data.shape_[0], row_length), cpu_stream); - Tensor dst_data_tensor = dst_data.get_with_shape( - Shape2(dst_data.shape_[0], row_length), gpu_stream); - for (size_t i = 0; i < num_rows_retained; ++i) { - Copy(dst_data_tensor[i], src_data_tensor[idx_tensor[i]], gpu_stream); - } - }) - }) + MSHADOW_TYPE_SWITCH( + src.dtype(), DType, {MSHADOW_IDX_TYPE_SWITCH(indices.dtype(), IType, { + // copy idx array + Tensor dst_idx_tensor = + dst_idx_data.FlatTo1D(gpu_stream); + const Tensor idx_tensor = + idx_data.FlatTo1D(cpu_stream); + Copy(dst_idx_tensor, idx_tensor, gpu_stream); + // copy src data + const Tensor src_data_tensor = + src_data.get_with_shape( + Shape2(src_data.shape_[0], row_length), cpu_stream); + Tensor dst_data_tensor = + dst_data.get_with_shape( + Shape2(dst_data.shape_[0], row_length), gpu_stream); + for (size_t i = 0; i < num_rows_retained; ++i) { + Copy(dst_data_tensor[i], src_data_tensor[idx_tensor[i]], + gpu_stream); + } + })}) #else LOG(FATAL) << "GPU not enabled"; #endif } // reduce sum into val[0] - inline void ReduceSumCPU(const std::vector &in_data) { + inline void ReduceSumCPU(const std::vector& in_data) { MSHADOW_TYPE_SWITCH(in_data[0].dtype(), DType, { std::vector dptr(in_data.size()); for (size_t i = 0; i < in_data.size(); ++i) { @@ -335,7 +348,8 @@ class CommCPU : public Comm { } // serial implementation of reduce sum for row sparse NDArray. - inline void ReduceSumCPUExSerial(const std::vector &in, NDArray *out) { + inline void ReduceSumCPUExSerial(const std::vector& in, + NDArray* out) { using namespace rowsparse; using namespace mshadow; auto stype = out->storage_type(); @@ -374,7 +388,8 @@ class CommCPU : public Comm { CHECK_EQ(indices.size(), total_num_rows); // dedup indices std::sort(indices.begin(), indices.end()); - indices.resize(std::unique(indices.begin(), indices.end()) - indices.begin()); + indices.resize(std::unique(indices.begin(), indices.end()) - + indices.begin()); // the one left are unique non-zero rows size_t nnr = indices.size(); // allocate memory for output @@ -406,12 +421,12 @@ class CommCPU : public Comm { }); } - template - inline static void ReduceSumCPU( - const std::vector &dptr, size_t offset, index_t size) { + template + inline static void ReduceSumCPU(const std::vector& dptr, + size_t offset, index_t size) { using namespace mshadow; // NOLINT(*) Tensor in_0(dptr[0] + offset, Shape1(size)); - for (size_t i = 1; i < dptr.size(); i+=4) { + for (size_t i = 1; i < dptr.size(); i += 4) { switch (dptr.size() - i) { case 1: { Tensor in_1(dptr[i] + offset, Shape1(size)); @@ -420,22 +435,22 @@ class CommCPU : public Comm { } case 2: { Tensor in_1(dptr[i] + offset, Shape1(size)); - Tensor in_2(dptr[i+1] + offset, Shape1(size)); + Tensor in_2(dptr[i + 1] + offset, Shape1(size)); in_0 += in_1 + in_2; break; } case 3: { Tensor in_1(dptr[i] + offset, Shape1(size)); - Tensor in_2(dptr[i+1] + offset, Shape1(size)); - Tensor in_3(dptr[i+2] + offset, Shape1(size)); + Tensor in_2(dptr[i + 1] + offset, Shape1(size)); + Tensor in_3(dptr[i + 2] + offset, Shape1(size)); in_0 += in_1 + in_2 + in_3; break; } default: { Tensor in_1(dptr[i] + offset, Shape1(size)); - Tensor in_2(dptr[i+1] + offset, Shape1(size)); - Tensor in_3(dptr[i+2] + offset, Shape1(size)); - Tensor in_4(dptr[i+3] + offset, Shape1(size)); + Tensor in_2(dptr[i + 1] + offset, Shape1(size)); + Tensor in_3(dptr[i + 2] + offset, Shape1(size)); + Tensor in_4(dptr[i + 3] + offset, Shape1(size)); in_0 += in_1 + in_2 + in_3 + in_4; break; } @@ -443,15 +458,15 @@ class CommCPU : public Comm { } } - template + template inline void ReduceSumCPUImpl(std::vector dptr, size_t total) { const size_t step = std::min(bigarray_bound_, static_cast(4 << 10)); - long ntask = (total + step - 1) / step; // NOLINT(*) + long ntask = (total + step - 1) / step; // NOLINT(*) if (total < bigarray_bound_ || nthread_reduction_ <= 1) { ReduceSumCPU(dptr, 0, total); } else { - #pragma omp parallel for schedule(static) num_threads(nthread_reduction_) - for (long j = 0; j < ntask; ++j) { // NOLINT(*) +#pragma omp parallel for schedule(static) num_threads(nthread_reduction_) + for (long j = 0; j < ntask; ++j) { // NOLINT(*) size_t k = static_cast(j); size_t begin = std::min(k * step, total); size_t end = std::min((k + 1) * step, total); @@ -484,11 +499,9 @@ class CommCPU : public Comm { */ class CommDevice : public Comm { public: - CommDevice() { - inited_ = false; - } + CommDevice() { inited_ = false; } - virtual ~CommDevice() { } + virtual ~CommDevice() {} void Init(int key, const NDArrayStorageType stype, const TShape& shape, int dtype = mshadow::kFloat32) override { @@ -523,44 +536,84 @@ class CommDevice : public Comm { } InitBuffersAndComm(src); + auto& stage = stage_buf_[key]; auto& buf = merge_buf_[key]; - std::vector reduce(src.size()); + std::vector reduce_s; const NDArrayStorageType stype = buf.merged.storage_type(); if (stype == kDefaultStorage) { - CopyFromTo(src[0], &(buf.merged), priority); - reduce[0] = buf.merged; - - if (buf.copy_buf.empty()) { - // TODO(mli) this results in large device memory usage for huge ndarray, - // such as the largest fullc in VGG. consider to do segment reduce with - // NDArray.Slice or gpu direct memory access. for the latter, we need to - // remove some ctx check, and also it reduces 20% perf - buf.copy_buf.resize(src.size()-1); - for (size_t i = 0; i < src.size()-1; ++i) { - buf.copy_buf[i] = NDArray( - buf.merged.shape(), buf.merged.ctx(), false, buf.merged.dtype()); - } + if (buf.merged.is_none() && stage.copy_buf.empty()) { + stage.copy_buf.resize(src.size() - 1); + for (size_t i = 0; i < src.size() - 1; ++i) + stage.copy_buf[i] = NDArray(stage.merged.shape(), stage.merged.ctx(), + false, stage.merged.dtype()); } - for (size_t i = 0; i < src.size()-1; ++i) { - CopyFromTo(src[i+1], &(buf.copy_buf[i]), priority); - reduce[i+1] = buf.copy_buf[i]; + reduce_s.resize(stage.copy_buf.size() + 1); + for (size_t i = 0, j = 0; i < src.size(); ++i) { + int id = src[i].ctx().dev_id; + if ((!buf.merged.is_none() && id == stage.merged.ctx().dev_id) || + (buf.merged.is_none() && i == 0)) { + CopyFromTo(src[i], &(stage.merged), priority); + reduce_s[0] = stage.merged; + } else if (id >= 4 || buf.merged.is_none()) { + CopyFromTo(src[i], &(stage.copy_buf[j]), priority); + reduce_s[j + 1] = stage.copy_buf[j]; + j++; + } } } else { - if (buf.copy_buf.empty()) { - buf.copy_buf.resize(src.size()); - for (size_t j = 0; j < src.size(); ++j) { - buf.copy_buf[j] = NDArray( - buf.merged.storage_type(), buf.merged.shape(), buf.merged.ctx(), - true, buf.merged.dtype()); + if (buf.merged.is_none() && stage.copy_buf.empty()) { + stage.copy_buf.resize(src.size()); + for (size_t j = 0; j < src.size(); ++j) + stage.copy_buf[j] = NDArray(stage.merged.shape(), stage.merged.ctx(), + true, stage.merged.dtype()); + } + reduce_s.resize(stage.copy_buf.size()); + for (size_t i = 0, j = 0; i < src.size(); ++i) { + int id = src[i].ctx().dev_id; + if (id >= 4 || buf.merged.is_none()) { + CopyFromTo(src[i], &(stage.copy_buf[j]), priority); + reduce_s[j] = stage.copy_buf[j]; + j++; } } - for (size_t i = 0; i < src.size(); ++i) { - CopyFromTo(src[i], &(buf.copy_buf[i]), priority); - reduce[i] = buf.copy_buf[i]; + } + ElementwiseSum(reduce_s, &stage.merged, priority); + // Main reduce result on gpu 0 including the partial result from gpu 4 + if (!buf.merged.is_none()) { + std::vector reduce; + if (stype == kDefaultStorage) { + reduce.resize(buf.copy_buf.size() + 1); + for (size_t i = 0, j = 0; i < src.size(); ++i) { + int id = src[i].ctx().dev_id; + if (id == buf.merged.ctx().dev_id) { + CopyFromTo(src[i], &(buf.merged), priority); + reduce[0] = buf.merged; + } else if (id < 4) { + CopyFromTo(src[i], &(buf.copy_buf[j]), priority); + reduce[j + 1] = buf.copy_buf[j]; + j++; + } + } + } else { + reduce.resize(buf.copy_buf.size()); + for (size_t i = 0, j = 0; i < src.size(); ++i) { + int id = src[i].ctx().dev_id; + if (id < 4) { + CopyFromTo(src[i], &(buf.copy_buf[j]), priority); + reduce[j] = buf.copy_buf[j]; + j++; + } + } } + CopyFromTo(stage.merged, &(buf.copy_buf[buf.copy_buf.size() - 1]), + priority); + reduce[reduce.size() - 1] = buf.copy_buf[buf.copy_buf.size() - 1]; + ElementwiseSum(reduce, &buf.merged); + } else { + return stage.merged; } - ElementwiseSum(reduce, &buf.merged, priority); + return buf.merged; } @@ -577,16 +630,16 @@ class CommDevice : public Comm { buf.residual.resize(src.size()); for (size_t i = 0; i < src.size(); ++i) { - buf.copy_buf[i] = NDArray(buf.merged.shape(), buf.merged.ctx(), - false, buf.merged.dtype()); - buf.residual[i] = NDArray(buf.merged.shape(), src[i].ctx(), - false, buf.merged.dtype()); + buf.copy_buf[i] = NDArray(buf.merged.shape(), buf.merged.ctx(), false, + buf.merged.dtype()); + buf.residual[i] = NDArray(buf.merged.shape(), src[i].ctx(), false, + buf.merged.dtype()); buf.residual[i] = 0; int64_t small_size = gc_->GetCompressedSize(buf.merged.shape().Size()); - buf.compressed_recv_buf[i] = NDArray(TShape{small_size}, buf.merged.ctx(), - false, buf.merged.dtype()); + buf.compressed_recv_buf[i] = NDArray( + TShape{small_size}, buf.merged.ctx(), false, buf.merged.dtype()); buf.compressed_send_buf[i] = NDArray(TShape{small_size}, src[i].ctx(), - false, buf.merged.dtype()); + false, buf.merged.dtype()); } } @@ -594,10 +647,13 @@ class CommDevice : public Comm { // compress before copy // this is done even if the data is on same context as copy_buf because // we don't want the training to be biased towards data on this GPU - gc_->Quantize(src[i], &(buf.compressed_send_buf[i]), &(buf.residual[i]), priority); + gc_->Quantize(src[i], &(buf.compressed_send_buf[i]), &(buf.residual[i]), + priority); - if (buf.compressed_send_buf[i].ctx() != buf.compressed_recv_buf[i].ctx()) { - CopyFromTo(buf.compressed_send_buf[i], &(buf.compressed_recv_buf[i]), priority); + if (buf.compressed_send_buf[i].ctx() != + buf.compressed_recv_buf[i].ctx()) { + CopyFromTo(buf.compressed_send_buf[i], &(buf.compressed_recv_buf[i]), + priority); } else { // avoid memory copy when they are on same context buf.compressed_recv_buf[i] = buf.compressed_send_buf[i]; @@ -610,32 +666,36 @@ class CommDevice : public Comm { return buf.merged; } - void Broadcast(int key, const NDArray& src, - const std::vector dst, int priority) override { + void Broadcast(int key, const NDArray& src, const std::vector dst, + int priority) override { if (!inited_) { // copy to a random device first int dev_id = key % dst.size(); - CopyFromTo(src, dst[dev_id], priority); + CopyFromTo(src, (dst[dev_id]), priority); for (size_t i = 0; i < dst.size(); ++i) { if (i != static_cast(dev_id)) { - CopyFromTo(*dst[dev_id], dst[i], priority); + CopyFromTo(*dst[dev_id], (dst[i]), priority); } } } else { auto& buf = merge_buf_[key]; - CopyFromTo(src, &buf.merged, priority); + auto& stage = stage_buf_[key]; + if (!buf.merged.is_none()) CopyFromTo(src, &buf.merged, priority); + CopyFromTo(src, &stage.merged, priority); for (auto d : dst) { - CopyFromTo(buf.merged, d, priority); + if (d->ctx().dev_id >= 4 || buf.merged.is_none()) + CopyFromTo(stage.merged, d, priority); + else + CopyFromTo(buf.merged, d, priority); } } } void BroadcastRowSparse(int key, const NDArray& src, const std::vector>& dst, - const bool use_copy, - const int priority) override { + const bool use_copy, const int priority) override { CHECK_EQ(src.storage_type(), kRowSparseStorage) - << "BroadcastRowSparse expects row-sparse src NDArray"; + << "BroadcastRowSparse expects row-sparse src NDArray"; for (size_t i = 0; i < dst.size(); ++i) { NDArray* out = dst[i].first; @@ -644,38 +704,44 @@ class CommDevice : public Comm { CopyFromTo(src, out, priority); } else { CHECK_EQ(out->storage_type(), kRowSparseStorage) - << "BroadcastRowSparse expects row_sparse dst NDArray"; + << "BroadcastRowSparse expects row_sparse dst NDArray"; const bool is_diff_ctx = out->ctx() != src.ctx(); - NDArray out_gpu = is_diff_ctx? NDArray(kRowSparseStorage, out->shape(), - src.ctx(), true, out->dtype(), out->aux_types()) : *out; + NDArray out_gpu = + is_diff_ctx ? NDArray(kRowSparseStorage, out->shape(), src.ctx(), + true, out->dtype(), out->aux_types()) + : *out; CHECK_EQ(row_id.ctx(), src.ctx()) - << "row_id and src are expected to be on the same context"; - - Engine::Get()->PushAsync([=](RunContext rctx, Engine::CallbackOnComplete on_complete) { - NDArray temp = out_gpu; - const TBlob& indices = row_id.data(); - switch (temp.ctx().dev_mask()) { - case cpu::kDevMask: { - mxnet::common::SparseRetainOpForwardRspWrapper(rctx.get_stream(), - src, indices, kWriteTo, &temp); - break; - } + << "row_id and src are expected to be on the same context"; + + Engine::Get()->PushAsync( + [=](RunContext rctx, Engine::CallbackOnComplete on_complete) { + NDArray temp = out_gpu; + const TBlob& indices = row_id.data(); + switch (temp.ctx().dev_mask()) { + case cpu::kDevMask: { + mxnet::common::SparseRetainOpForwardRspWrapper( + rctx.get_stream(), src, indices, kWriteTo, &temp); + break; + } #if MXNET_USE_CUDA - case gpu::kDevMask: { - mxnet::common::SparseRetainOpForwardRspWrapper(rctx.get_stream(), - src, indices, kWriteTo, &temp); - // wait for GPU operations to complete - rctx.get_stream()->Wait(); - break; - } + case gpu::kDevMask: { + mxnet::common::SparseRetainOpForwardRspWrapper( + rctx.get_stream(), src, indices, kWriteTo, &temp); + // wait for GPU operations to complete + rctx.get_stream()->Wait(); + break; + } #endif - default: LOG(FATAL) << MXNET_GPU_NOT_ENABLED_ERROR; - } - on_complete(); - }, out_gpu.ctx(), {src.var(), row_id.var()}, {out_gpu.var()}, - FnProperty::kNormal, priority, PROFILER_MESSAGE("KVStoreSparseRetain")); + default: + LOG(FATAL) << MXNET_GPU_NOT_ENABLED_ERROR; + } + on_complete(); + }, + out_gpu.ctx(), {src.var(), row_id.var()}, {out_gpu.var()}, + FnProperty::kNormal, priority, + PROFILER_MESSAGE("KVStoreSparseRetain")); if (is_diff_ctx) { CopyFromTo(out_gpu, out, priority); } @@ -694,7 +760,7 @@ class CommDevice : public Comm { } int n = static_cast(gpus.size()); int enabled = 0; - std::vector p2p(n*n); + std::vector p2p(n * n); for (int i = 0; i < n; ++i) { cudaSetDevice(gpus[i]); for (int j = 0; j < n; j++) { @@ -704,21 +770,21 @@ class CommDevice : public Comm { cudaError_t e = cudaDeviceEnablePeerAccess(gpus[j], 0); if (e == cudaSuccess || e == cudaErrorPeerAccessAlreadyEnabled) { ++enabled; - p2p[i*n+j] = 1; + p2p[i * n + j] = 1; } } } } - if (enabled != n*(n-1)) { + if (enabled != n * (n - 1)) { // print warning info if not fully enabled - LOG(WARNING) << "only " << enabled << " out of " - << n*(n-1) << " GPU pairs are enabled direct access. " + LOG(WARNING) << "only " << enabled << " out of " << n * (n - 1) + << " GPU pairs are enabled direct access. " << "It may affect the performance. " << "You can set MXNET_ENABLE_GPU_P2P=0 to turn it off"; std::string access(n, '.'); for (int i = 0; i < n; ++i) { for (int j = 0; j < n; ++j) { - access[j] = p2p[i*n+j] ? 'v' : '.'; + access[j] = p2p[i * n + j] ? 'v' : '.'; } LOG(WARNING) << access; } @@ -729,36 +795,101 @@ class CommDevice : public Comm { using KeyAttrs = std::tuple; // try to allocate buff on device evenly void InitMergeBuffer(const std::vector& devs) { - std::sort(sorted_key_attrs_.begin(), sorted_key_attrs_.end(), []( - const KeyAttrs& a, const KeyAttrs& b) { - return std::get<1>(a).Size() > std::get<1>(b).Size(); - }); - - std::unordered_map> ctx_info; - for (auto d : devs) { - ctx_info[d.dev_id] = std::make_pair(d, 0); + std::sort(sorted_key_attrs_.begin(), sorted_key_attrs_.end(), + [](const KeyAttrs& a, const KeyAttrs& b) { + return std::get<1>(a).Size() > std::get<1>(b).Size(); + }); + + std::vector g1, g2; + for (auto& d : devs) { + if (d.dev_id < 4) + g1.push_back(d); + else + g2.push_back(d); } - for (size_t i = 0; i < sorted_key_attrs_.size(); ++i) { - const int key = std::get<0>(sorted_key_attrs_[i]); - const TShape& shape = std::get<1>(sorted_key_attrs_[i]); - const int type = std::get<2>(sorted_key_attrs_[i]); - const NDArrayStorageType stype = std::get<3>(sorted_key_attrs_[i]); - auto& buf = merge_buf_[key]; - Context ctx; - size_t min_size = std::numeric_limits::max(); - for (auto it = ctx_info.begin(); it != ctx_info.end(); ++it) { - size_t size = it->second.second; - if (size <= min_size) { - ctx = it->second.first; - min_size = size; + if (g1.empty() || g2.empty()) { + // 4 gpus are all connected by NVLinks: use all-to-all + std::unordered_map> ctx_info; + for (auto d : devs) { + ctx_info[d.dev_id] = std::make_pair(d, 0); + } + for (size_t i = 0; i < sorted_key_attrs_.size(); ++i) { + const int key = std::get<0>(sorted_key_attrs_[i]); + const TShape shape = std::get<1>(sorted_key_attrs_[i]); + const int type = std::get<2>(sorted_key_attrs_[i]); + const NDArrayStorageType stype = std::get<3>(sorted_key_attrs_[i]); + auto& stage = stage_buf_[key]; + Context ctx; + size_t min_size = std::numeric_limits::max(); + for (auto it = ctx_info.begin(); it != ctx_info.end(); ++it) { + size_t size = it->second.second; + if (size <= min_size) { + ctx = it->second.first; + min_size = size; + } } + if (stype == kDefaultStorage) { + stage.merged = NDArray(shape, ctx, false, type); + } else { + stage.merged = NDArray(stype, shape, ctx, true, type); + } + ctx_info[ctx.dev_id].second += shape.Size(); } - if (stype == kDefaultStorage) { - buf.merged = NDArray(shape, ctx, false, type); - } else { - buf.merged = NDArray(stype, shape, ctx, true, type); + } else { + // QPI connections are included: use spanning tree + size_t gpu0, gpu1; + for (gpu0 = 0, gpu1 = 0; gpu0 < g1.size() && gpu1 < g2.size();) { + if (g2[gpu1].dev_id - g1[gpu0].dev_id == 4) + break; + else if (g2[gpu1].dev_id - g1[gpu0].dev_id > 4) + gpu0++; + else + gpu1++; + } + if (gpu0 == g1.size() || gpu1 == g2.size()) gpu0 = gpu1 = 0; + for (size_t i = 0; i < sorted_key_attrs_.size(); ++i) { + const int key = std::get<0>(sorted_key_attrs_[i]); + const TShape shape = std::get<1>(sorted_key_attrs_[i]); + const int type = std::get<2>(sorted_key_attrs_[i]); + const NDArrayStorageType stype = std::get<3>(sorted_key_attrs_[i]); + auto& buf = merge_buf_[key]; + auto& stage = stage_buf_[key]; + if (stype == kDefaultStorage) { + buf.merged = NDArray(shape, g1[gpu0], false, type); + if (buf.copy_buf.empty()) { + buf.copy_buf.resize(g1.size()); + for (size_t i = 0; i < g1.size(); ++i) + buf.copy_buf[i] = NDArray(buf.merged.shape(), buf.merged.ctx(), + false, buf.merged.dtype()); + } + + stage.merged = NDArray(shape, g2[gpu1], false, type); + if (stage.copy_buf.empty()) { + stage.copy_buf.resize(g2.size() - 1); + for (size_t i = 0; i < g2.size() - 1; ++i) + stage.copy_buf[i] = + NDArray(stage.merged.shape(), stage.merged.ctx(), false, + stage.merged.dtype()); + } + } else { + buf.merged = NDArray(stype, shape, g1[gpu0], true, type); + if (buf.copy_buf.empty()) { + buf.copy_buf.resize(g1.size() + 1); + for (size_t i = 0; i < g1.size() + 1; ++i) + buf.copy_buf[i] = NDArray(buf.merged.shape(), buf.merged.ctx(), + false, buf.merged.dtype()); + } + + stage.merged = NDArray(stype, shape, g2[gpu1], true, type); + if (stage.copy_buf.empty()) { + stage.copy_buf.resize(g2.size()); + for (size_t i = 0; i < g2.size(); ++i) + stage.copy_buf[i] = + NDArray(stage.merged.shape(), stage.merged.ctx(), false, + stage.merged.dtype()); + } + } } - ctx_info[ctx.dev_id].second += shape.Size(); } inited_ = true; } @@ -778,6 +909,7 @@ class CommDevice : public Comm { std::vector compressed_recv_buf; }; std::unordered_map merge_buf_; + std::unordered_map stage_buf_; bool inited_; }; From f4e45cf53146fcd39f5b6859384f6e6f3d0b5abc Mon Sep 17 00:00:00 2001 From: Leyuan Wang Date: Tue, 5 Dec 2017 08:59:09 +0000 Subject: [PATCH 2/8] review comments addressed --- src/kvstore/comm.h | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/kvstore/comm.h b/src/kvstore/comm.h index 4c8d1ac4a4c2..4c0c57de01b4 100644 --- a/src/kvstore/comm.h +++ b/src/kvstore/comm.h @@ -22,6 +22,7 @@ */ #ifndef MXNET_KVSTORE_COMM_H_ #define MXNET_KVSTORE_COMM_H_ +#define NVLINK_SUPPORT 4 #include #include #include @@ -674,7 +675,7 @@ class CommDevice : public Comm { CopyFromTo(src, (dst[dev_id]), priority); for (size_t i = 0; i < dst.size(); ++i) { if (i != static_cast(dev_id)) { - CopyFromTo(*dst[dev_id], (dst[i]), priority); + CopyFromTo(*dst[dev_id], dst[i], priority); } } } else { @@ -909,6 +910,7 @@ class CommDevice : public Comm { std::vector compressed_recv_buf; }; std::unordered_map merge_buf_; + /// \brief the small buffer for partially merged data std::unordered_map stage_buf_; bool inited_; }; From a11158f36789a0d3dd7f256e22735dee1f814123 Mon Sep 17 00:00:00 2001 From: Leyuan Wang Date: Tue, 19 Dec 2017 23:38:00 +0000 Subject: [PATCH 3/8] nvlink communication applied to ReduceCompressed --- src/kvstore/comm.h | 166 +++++++++++++++++++++++++++++++++++---------- 1 file changed, 130 insertions(+), 36 deletions(-) diff --git a/src/kvstore/comm.h b/src/kvstore/comm.h index 4c0c57de01b4..1a927f85b95b 100644 --- a/src/kvstore/comm.h +++ b/src/kvstore/comm.h @@ -622,48 +622,141 @@ class CommDevice : public Comm { int priority) { InitBuffersAndComm(src); auto& buf = merge_buf_[key]; - std::vector reduce(src.size()); - if (buf.copy_buf.empty()) { + auto& stage = stage_buf_[key]; + if (buf.merged.is_none() && stage.copy_buf.empty()) { // one buf for each context - buf.copy_buf.resize(src.size()); - buf.compressed_recv_buf.resize(src.size()); - buf.compressed_send_buf.resize(src.size()); - buf.residual.resize(src.size()); + stage.copy_buf.resize(src.size()); + stage.compressed_recv_buf.resize(src.size()); + stage.compressed_send_buf.resize(src.size()); + stage.residual.resize(src.size()); for (size_t i = 0; i < src.size(); ++i) { - buf.copy_buf[i] = NDArray(buf.merged.shape(), buf.merged.ctx(), false, - buf.merged.dtype()); - buf.residual[i] = NDArray(buf.merged.shape(), src[i].ctx(), false, - buf.merged.dtype()); - buf.residual[i] = 0; + stage.copy_buf[i] = NDArray(stage.merged.shape(), stage.merged.ctx(), + false, stage.merged.dtype()); + stage.residual[i] = NDArray(stage.merged.shape(), src[i].ctx(), false, + stage.merged.dtype()); + stage.residual[i] = 0; + int64_t small_size = + gc_->GetCompressedSize(stage.merged.shape().Size()); + stage.compressed_recv_buf[i] = + NDArray(TShape{small_size}, stage.merged.ctx(), false, + stage.merged.dtype()); + stage.compressed_send_buf[i] = NDArray(TShape{small_size}, src[i].ctx(), + false, stage.merged.dtype()); + } + } else if (!buf.merged.is_none()) { + if (buf.copy_buf.empty() && stage.copy_buf.empty()) { + buf.copy_buf.resize(g1.size() + 1); + buf.compressed_recv_buf.resize(g1.size() + 1); + buf.compressed_send_buf.resize(g1.size() + 1); + buf.residual.resize(g1.size() + 1); + stage.copy_buf.resize(g2.size()); + stage.compressed_recv_buf.resize(g2.size()); + stage.compressed_send_buf.resize(g2.size()); + stage.residual.resize(g2.size()); + for (size_t i = 0, j = 0, k = 0; i < src.size(); ++i) { + int id = src[i].ctx().dev_id; + if (id < NVLINK_SUPPORT) { + buf.copy_buf[j] = NDArray(buf.merged.shape(), buf.merged.ctx(), + false, buf.merged.dtype()); + buf.residual[j] = NDArray(buf.merged.shape(), src[i].ctx(), false, + buf.merged.dtype()); + buf.residual[j] = 0; + int64_t small_size = + gc_->GetCompressedSize(buf.merged.shape().Size()); + buf.compressed_recv_buf[j] = + NDArray(TShape{small_size}, buf.merged.ctx(), false, + buf.merged.dtype()); + buf.compressed_send_buf[j] = NDArray( + TShape{small_size}, src[i].ctx(), false, buf.merged.dtype()); + j++; + } else { + stage.copy_buf[k] = + NDArray(stage.merged.shape(), stage.merged.ctx(), false, + stage.merged.dtype()); + stage.residual[k] = NDArray(stage.merged.shape(), src[i].ctx(), + false, stage.merged.dtype()); + stage.residual[k] = 0; + int64_t small_size = + gc_->GetCompressedSize(stage.merged.shape().Size()); + stage.compressed_recv_buf[k] = + NDArray(TShape{small_size}, stage.merged.ctx(), false, + stage.merged.dtype()); + stage.compressed_send_buf[k] = NDArray( + TShape{small_size}, src[i].ctx(), false, stage.merged.dtype()); + k++; + } + } + buf.copy_buf[g1.size()] = NDArray(buf.merged.shape(), buf.merged.ctx(), + false, buf.merged.dtype()); + buf.residual[g1.size()] = NDArray( + buf.merged.shape(), stage.merged.ctx(), false, buf.merged.dtype()); + buf.residual[g1.size()] = 0; int64_t small_size = gc_->GetCompressedSize(buf.merged.shape().Size()); - buf.compressed_recv_buf[i] = NDArray( + buf.compressed_recv_buf[g1.size()] = NDArray( TShape{small_size}, buf.merged.ctx(), false, buf.merged.dtype()); - buf.compressed_send_buf[i] = NDArray(TShape{small_size}, src[i].ctx(), - false, buf.merged.dtype()); + buf.compressed_send_buf[g1.size()] = NDArray( + TShape{small_size}, stage.merged.ctx(), false, buf.merged.dtype()); } } + std::vector reduce_s(stage.copy_buf.size()); + std::vector reduce(buf.copy_buf.size()); + + for (size_t i = 0, j = 0, k = 0; i < src.size(); ++i) { + int id = src[i].ctx().dev_id; + if (id >= NVLINK_SUPPORT || buf.merged.is_none()) { + // compress before copy + // this is done even if the data is on same context as copy_buf because + // we don't want the training to be biased towards data on this GPU + gc_->Quantize(src[i], &(stage.compressed_send_buf[j]), + &(stage.residual[j]), priority); + + if (stage.compressed_send_buf[j].ctx() != + stage.compressed_recv_buf[j].ctx()) { + CopyFromTo(stage.compressed_send_buf[j], + &(stage.compressed_recv_buf[j]), priority); + } else { + // avoid memory copy when they are on same context + stage.compressed_recv_buf[j] = stage.compressed_send_buf[j]; + } - for (size_t i = 0; i < src.size(); ++i) { - // compress before copy - // this is done even if the data is on same context as copy_buf because - // we don't want the training to be biased towards data on this GPU - gc_->Quantize(src[i], &(buf.compressed_send_buf[i]), &(buf.residual[i]), - priority); - - if (buf.compressed_send_buf[i].ctx() != - buf.compressed_recv_buf[i].ctx()) { - CopyFromTo(buf.compressed_send_buf[i], &(buf.compressed_recv_buf[i]), - priority); + gc_->Dequantize(stage.compressed_recv_buf[j], &(stage.copy_buf[j]), + priority); + reduce_s[j] = stage.copy_buf[j]; + j++; } else { - // avoid memory copy when they are on same context - buf.compressed_recv_buf[i] = buf.compressed_send_buf[i]; - } + gc_->Quantize(src[i], &(buf.compressed_send_buf[k]), &(buf.residual[k]), + priority); - gc_->Dequantize(buf.compressed_recv_buf[i], &(buf.copy_buf[i]), priority); - reduce[i] = buf.copy_buf[i]; + if (buf.compressed_send_buf[k].ctx() != + buf.compressed_recv_buf[k].ctx()) { + CopyFromTo(buf.compressed_send_buf[k], &(buf.compressed_recv_buf[k]), + priority); + } else { + // avoid memory copy when they are on same context + buf.compressed_recv_buf[k] = buf.compressed_send_buf[k]; + } + + gc_->Dequantize(buf.compressed_recv_buf[k], &(buf.copy_buf[k]), + priority); + reduce[k] = buf.copy_buf[k]; + k++; + } } - ElementwiseSum(reduce, &buf.merged); + ElementwiseSum(reduce_s, &stage.merged); + if (buf.merged.is_none()) { + return stage.merged; + } else { + gc_->Quantize(stage.merged, &buf.compressed_send_buf[g1.size()], + &(buf.residual[g1.size()]), priority); + CopyFromTo(buf.compressed_send_buf[g1.size()], + &(buf.compressed_recv_buf[g1.size()]), priority); + gc_->Dequantize(buf.compressed_recv_buf[g1.size()], + &(buf.copy_buf[g1.size()]), priority); + reduce[reduce.size() - 1] = buf.copy_buf[g1.size()]; + ElementwiseSum(reduce, &buf.merged); + } + return buf.merged; } @@ -684,7 +777,7 @@ class CommDevice : public Comm { if (!buf.merged.is_none()) CopyFromTo(src, &buf.merged, priority); CopyFromTo(src, &stage.merged, priority); for (auto d : dst) { - if (d->ctx().dev_id >= 4 || buf.merged.is_none()) + if (d->ctx().dev_id >= NVLINK_SUPPORT || buf.merged.is_none()) CopyFromTo(stage.merged, d, priority); else CopyFromTo(buf.merged, d, priority); @@ -801,7 +894,6 @@ class CommDevice : public Comm { return std::get<1>(a).Size() > std::get<1>(b).Size(); }); - std::vector g1, g2; for (auto& d : devs) { if (d.dev_id < 4) g1.push_back(d); @@ -809,7 +901,7 @@ class CommDevice : public Comm { g2.push_back(d); } if (g1.empty() || g2.empty()) { - // 4 gpus are all connected by NVLinks: use all-to-all + // all gpus are all connected by NVLinks: use all-to-all std::unordered_map> ctx_info; for (auto d : devs) { ctx_info[d.dev_id] = std::make_pair(d, 0); @@ -840,9 +932,9 @@ class CommDevice : public Comm { // QPI connections are included: use spanning tree size_t gpu0, gpu1; for (gpu0 = 0, gpu1 = 0; gpu0 < g1.size() && gpu1 < g2.size();) { - if (g2[gpu1].dev_id - g1[gpu0].dev_id == 4) + if (g2[gpu1].dev_id - g1[gpu0].dev_id == NVLINK_SUPPORT) break; - else if (g2[gpu1].dev_id - g1[gpu0].dev_id > 4) + else if (g2[gpu1].dev_id - g1[gpu0].dev_id > NVLINK_SUPPORT) gpu0++; else gpu1++; @@ -895,6 +987,8 @@ class CommDevice : public Comm { inited_ = true; } + /// \brief the NVLinked connected gpu groups + std::vector g1, g2; std::vector sorted_key_attrs_; /// \brief temporal space for pushing and pulling struct BufferEntry { From faa74f5e54bbd0b2da614dc247de065f35ed16d8 Mon Sep 17 00:00:00 2001 From: Leyuan Wang Date: Thu, 25 Jan 2018 21:20:48 +0000 Subject: [PATCH 4/8] comments addressed --- src/kvstore/comm.h | 114 ++++++++++++++++++++++----------------------- 1 file changed, 57 insertions(+), 57 deletions(-) diff --git a/src/kvstore/comm.h b/src/kvstore/comm.h index 1a927f85b95b..8f22a5f55e91 100644 --- a/src/kvstore/comm.h +++ b/src/kvstore/comm.h @@ -119,7 +119,7 @@ class CommCPU : public Comm { const NDArray& Reduce(int key, const std::vector& src, int priority) override { - auto& buf = merge_buf_[key]; + BufferEntry& buf = merge_buf_[key]; // avoid extra copy for single device, but it may bring problems for // abnormal usage of kvstore if (src.size() == 1) { @@ -205,7 +205,7 @@ class CommCPU : public Comm { for (auto& d : dst) CopyFromTo(src, d, priority); } else { // first copy data to cpu, then broadcast - auto& buf = merge_buf_[key]; + BufferEntry& buf = merge_buf_[key]; CopyFromTo(src, &buf.merged, priority); for (auto& d : dst) CopyFromTo(buf.merged, d, priority); } @@ -537,8 +537,8 @@ class CommDevice : public Comm { } InitBuffersAndComm(src); - auto& stage = stage_buf_[key]; - auto& buf = merge_buf_[key]; + BufferEntry& stage = stage_buf_[key]; + BufferEntry& buf = merge_buf_[key]; std::vector reduce_s; const NDArrayStorageType stype = buf.merged.storage_type(); @@ -554,8 +554,7 @@ class CommDevice : public Comm { int id = src[i].ctx().dev_id; if ((!buf.merged.is_none() && id == stage.merged.ctx().dev_id) || (buf.merged.is_none() && i == 0)) { - CopyFromTo(src[i], &(stage.merged), priority); - reduce_s[0] = stage.merged; + reduce_s[0] = src[i]; } else if (id >= 4 || buf.merged.is_none()) { CopyFromTo(src[i], &(stage.copy_buf[j]), priority); reduce_s[j + 1] = stage.copy_buf[j]; @@ -588,8 +587,7 @@ class CommDevice : public Comm { for (size_t i = 0, j = 0; i < src.size(); ++i) { int id = src[i].ctx().dev_id; if (id == buf.merged.ctx().dev_id) { - CopyFromTo(src[i], &(buf.merged), priority); - reduce[0] = buf.merged; + reduce[0] = src[i]; } else if (id < 4) { CopyFromTo(src[i], &(buf.copy_buf[j]), priority); reduce[j + 1] = buf.copy_buf[j]; @@ -621,8 +619,8 @@ class CommDevice : public Comm { const NDArray& ReduceCompressed(int key, const std::vector& src, int priority) { InitBuffersAndComm(src); - auto& buf = merge_buf_[key]; - auto& stage = stage_buf_[key]; + BufferEntry& buf = merge_buf_[key]; + BufferEntry& stage = stage_buf_[key]; if (buf.merged.is_none() && stage.copy_buf.empty()) { // one buf for each context stage.copy_buf.resize(src.size()); @@ -646,14 +644,14 @@ class CommDevice : public Comm { } } else if (!buf.merged.is_none()) { if (buf.copy_buf.empty() && stage.copy_buf.empty()) { - buf.copy_buf.resize(g1.size() + 1); - buf.compressed_recv_buf.resize(g1.size() + 1); - buf.compressed_send_buf.resize(g1.size() + 1); - buf.residual.resize(g1.size() + 1); - stage.copy_buf.resize(g2.size()); - stage.compressed_recv_buf.resize(g2.size()); - stage.compressed_send_buf.resize(g2.size()); - stage.residual.resize(g2.size()); + buf.copy_buf.resize(group1.size() + 1); + buf.compressed_recv_buf.resize(group1.size() + 1); + buf.compressed_send_buf.resize(group1.size() + 1); + buf.residual.resize(group1.size() + 1); + stage.copy_buf.resize(group2.size()); + stage.compressed_recv_buf.resize(group2.size()); + stage.compressed_send_buf.resize(group2.size()); + stage.residual.resize(group2.size()); for (size_t i = 0, j = 0, k = 0; i < src.size(); ++i) { int id = src[i].ctx().dev_id; if (id < NVLINK_SUPPORT) { @@ -687,15 +685,15 @@ class CommDevice : public Comm { k++; } } - buf.copy_buf[g1.size()] = NDArray(buf.merged.shape(), buf.merged.ctx(), - false, buf.merged.dtype()); - buf.residual[g1.size()] = NDArray( + buf.copy_buf[group1.size()] = NDArray( + buf.merged.shape(), buf.merged.ctx(), false, buf.merged.dtype()); + buf.residual[group1.size()] = NDArray( buf.merged.shape(), stage.merged.ctx(), false, buf.merged.dtype()); - buf.residual[g1.size()] = 0; + buf.residual[group1.size()] = 0; int64_t small_size = gc_->GetCompressedSize(buf.merged.shape().Size()); - buf.compressed_recv_buf[g1.size()] = NDArray( + buf.compressed_recv_buf[group1.size()] = NDArray( TShape{small_size}, buf.merged.ctx(), false, buf.merged.dtype()); - buf.compressed_send_buf[g1.size()] = NDArray( + buf.compressed_send_buf[group1.size()] = NDArray( TShape{small_size}, stage.merged.ctx(), false, buf.merged.dtype()); } } @@ -747,13 +745,13 @@ class CommDevice : public Comm { if (buf.merged.is_none()) { return stage.merged; } else { - gc_->Quantize(stage.merged, &buf.compressed_send_buf[g1.size()], - &(buf.residual[g1.size()]), priority); - CopyFromTo(buf.compressed_send_buf[g1.size()], - &(buf.compressed_recv_buf[g1.size()]), priority); - gc_->Dequantize(buf.compressed_recv_buf[g1.size()], - &(buf.copy_buf[g1.size()]), priority); - reduce[reduce.size() - 1] = buf.copy_buf[g1.size()]; + gc_->Quantize(stage.merged, &buf.compressed_send_buf[group1.size()], + &(buf.residual[group1.size()]), priority); + CopyFromTo(buf.compressed_send_buf[group1.size()], + &(buf.compressed_recv_buf[group1.size()]), priority); + gc_->Dequantize(buf.compressed_recv_buf[group1.size()], + &(buf.copy_buf[group1.size()]), priority); + reduce[reduce.size() - 1] = buf.copy_buf[group1.size()]; ElementwiseSum(reduce, &buf.merged); } @@ -765,15 +763,15 @@ class CommDevice : public Comm { if (!inited_) { // copy to a random device first int dev_id = key % dst.size(); - CopyFromTo(src, (dst[dev_id]), priority); + CopyFromTo(src, dst[dev_id], priority); for (size_t i = 0; i < dst.size(); ++i) { if (i != static_cast(dev_id)) { CopyFromTo(*dst[dev_id], dst[i], priority); } } } else { - auto& buf = merge_buf_[key]; - auto& stage = stage_buf_[key]; + BufferEntry& buf = merge_buf_[key]; + BufferEntry& stage = stage_buf_[key]; if (!buf.merged.is_none()) CopyFromTo(src, &buf.merged, priority); CopyFromTo(src, &stage.merged, priority); for (auto d : dst) { @@ -896,11 +894,11 @@ class CommDevice : public Comm { for (auto& d : devs) { if (d.dev_id < 4) - g1.push_back(d); + group1.push_back(d); else - g2.push_back(d); + group2.push_back(d); } - if (g1.empty() || g2.empty()) { + if (group1.empty() || group2.empty()) { // all gpus are all connected by NVLinks: use all-to-all std::unordered_map> ctx_info; for (auto d : devs) { @@ -911,7 +909,7 @@ class CommDevice : public Comm { const TShape shape = std::get<1>(sorted_key_attrs_[i]); const int type = std::get<2>(sorted_key_attrs_[i]); const NDArrayStorageType stype = std::get<3>(sorted_key_attrs_[i]); - auto& stage = stage_buf_[key]; + BufferEntry& stage = stage_buf_[key]; Context ctx; size_t min_size = std::numeric_limits::max(); for (auto it = ctx_info.begin(); it != ctx_info.end(); ++it) { @@ -931,52 +929,54 @@ class CommDevice : public Comm { } else { // QPI connections are included: use spanning tree size_t gpu0, gpu1; - for (gpu0 = 0, gpu1 = 0; gpu0 < g1.size() && gpu1 < g2.size();) { - if (g2[gpu1].dev_id - g1[gpu0].dev_id == NVLINK_SUPPORT) + // gpu0 and gpu1 hold the gpu indexes connected by nvlink between group1 + // and group2 groups accordingly + for (gpu0 = 0, gpu1 = 0; gpu0 < group1.size() && gpu1 < group2.size();) { + if (group2[gpu1].dev_id - group1[gpu0].dev_id == NVLINK_SUPPORT) break; - else if (g2[gpu1].dev_id - g1[gpu0].dev_id > NVLINK_SUPPORT) + else if (group2[gpu1].dev_id - group1[gpu0].dev_id > NVLINK_SUPPORT) gpu0++; else gpu1++; } - if (gpu0 == g1.size() || gpu1 == g2.size()) gpu0 = gpu1 = 0; + if (gpu0 == group1.size() || gpu1 == group2.size()) gpu0 = gpu1 = 0; for (size_t i = 0; i < sorted_key_attrs_.size(); ++i) { const int key = std::get<0>(sorted_key_attrs_[i]); const TShape shape = std::get<1>(sorted_key_attrs_[i]); const int type = std::get<2>(sorted_key_attrs_[i]); const NDArrayStorageType stype = std::get<3>(sorted_key_attrs_[i]); - auto& buf = merge_buf_[key]; - auto& stage = stage_buf_[key]; + BufferEntry& buf = merge_buf_[key]; + BufferEntry& stage = stage_buf_[key]; if (stype == kDefaultStorage) { - buf.merged = NDArray(shape, g1[gpu0], false, type); + buf.merged = NDArray(shape, group1[gpu0], false, type); if (buf.copy_buf.empty()) { - buf.copy_buf.resize(g1.size()); - for (size_t i = 0; i < g1.size(); ++i) + buf.copy_buf.resize(group1.size()); + for (size_t i = 0; i < group1.size(); ++i) buf.copy_buf[i] = NDArray(buf.merged.shape(), buf.merged.ctx(), false, buf.merged.dtype()); } - stage.merged = NDArray(shape, g2[gpu1], false, type); + stage.merged = NDArray(shape, group2[gpu1], false, type); if (stage.copy_buf.empty()) { - stage.copy_buf.resize(g2.size() - 1); - for (size_t i = 0; i < g2.size() - 1; ++i) + stage.copy_buf.resize(group2.size() - 1); + for (size_t i = 0; i < group2.size() - 1; ++i) stage.copy_buf[i] = NDArray(stage.merged.shape(), stage.merged.ctx(), false, stage.merged.dtype()); } } else { - buf.merged = NDArray(stype, shape, g1[gpu0], true, type); + buf.merged = NDArray(stype, shape, group1[gpu0], true, type); if (buf.copy_buf.empty()) { - buf.copy_buf.resize(g1.size() + 1); - for (size_t i = 0; i < g1.size() + 1; ++i) + buf.copy_buf.resize(group1.size() + 1); + for (size_t i = 0; i < group1.size() + 1; ++i) buf.copy_buf[i] = NDArray(buf.merged.shape(), buf.merged.ctx(), false, buf.merged.dtype()); } - stage.merged = NDArray(stype, shape, g2[gpu1], true, type); + stage.merged = NDArray(stype, shape, group2[gpu1], true, type); if (stage.copy_buf.empty()) { - stage.copy_buf.resize(g2.size()); - for (size_t i = 0; i < g2.size(); ++i) + stage.copy_buf.resize(group2.size()); + for (size_t i = 0; i < group2.size(); ++i) stage.copy_buf[i] = NDArray(stage.merged.shape(), stage.merged.ctx(), false, stage.merged.dtype()); @@ -988,7 +988,7 @@ class CommDevice : public Comm { } /// \brief the NVLinked connected gpu groups - std::vector g1, g2; + std::vector group1, group2; std::vector sorted_key_attrs_; /// \brief temporal space for pushing and pulling struct BufferEntry { From 90a47b068aac560945a9061497a30f6ce8ebb7d9 Mon Sep 17 00:00:00 2001 From: Leyuan Wang Date: Thu, 25 Jan 2018 23:10:43 +0000 Subject: [PATCH 5/8] fixed residual array size --- src/kvstore/comm.h | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/src/kvstore/comm.h b/src/kvstore/comm.h index 8f22a5f55e91..8aa77e6de339 100644 --- a/src/kvstore/comm.h +++ b/src/kvstore/comm.h @@ -647,7 +647,7 @@ class CommDevice : public Comm { buf.copy_buf.resize(group1.size() + 1); buf.compressed_recv_buf.resize(group1.size() + 1); buf.compressed_send_buf.resize(group1.size() + 1); - buf.residual.resize(group1.size() + 1); + buf.residual.resize(group1.size()); stage.copy_buf.resize(group2.size()); stage.compressed_recv_buf.resize(group2.size()); stage.compressed_send_buf.resize(group2.size()); @@ -687,9 +687,6 @@ class CommDevice : public Comm { } buf.copy_buf[group1.size()] = NDArray( buf.merged.shape(), buf.merged.ctx(), false, buf.merged.dtype()); - buf.residual[group1.size()] = NDArray( - buf.merged.shape(), stage.merged.ctx(), false, buf.merged.dtype()); - buf.residual[group1.size()] = 0; int64_t small_size = gc_->GetCompressedSize(buf.merged.shape().Size()); buf.compressed_recv_buf[group1.size()] = NDArray( TShape{small_size}, buf.merged.ctx(), false, buf.merged.dtype()); @@ -746,7 +743,7 @@ class CommDevice : public Comm { return stage.merged; } else { gc_->Quantize(stage.merged, &buf.compressed_send_buf[group1.size()], - &(buf.residual[group1.size()]), priority); + &(buf.residual[buf.residual.size()-1]), priority); CopyFromTo(buf.compressed_send_buf[group1.size()], &(buf.compressed_recv_buf[group1.size()]), priority); gc_->Dequantize(buf.compressed_recv_buf[group1.size()], From d10da3ce502db657b19f90c387bba9836be65e52 Mon Sep 17 00:00:00 2001 From: Leyuan Wang Date: Thu, 25 Jan 2018 23:19:54 +0000 Subject: [PATCH 6/8] add comments to illustrate data flow in reduce function --- src/kvstore/comm.h | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/src/kvstore/comm.h b/src/kvstore/comm.h index 8aa77e6de339..d2b6b8bed5a9 100644 --- a/src/kvstore/comm.h +++ b/src/kvstore/comm.h @@ -537,8 +537,11 @@ class CommDevice : public Comm { } InitBuffersAndComm(src); - BufferEntry& stage = stage_buf_[key]; + // merge buffer holds the first group of gpus BufferEntry& buf = merge_buf_[key]; + // stage buffer holds the data of the second group or the first when merge + // buffer is empty + BufferEntry& stage = stage_buf_[key]; std::vector reduce_s; const NDArrayStorageType stype = buf.merged.storage_type(); @@ -578,8 +581,11 @@ class CommDevice : public Comm { } } } + // Reducing either the second group of data or the second when merge buffer + // is empty ElementwiseSum(reduce_s, &stage.merged, priority); - // Main reduce result on gpu 0 including the partial result from gpu 4 + // Main reduce result on the first group of GPUs including the partial + // result from the second group if (!buf.merged.is_none()) { std::vector reduce; if (stype == kDefaultStorage) { @@ -605,6 +611,7 @@ class CommDevice : public Comm { } } } + // Copy the second group's reducing result to merge buffer CopyFromTo(stage.merged, &(buf.copy_buf[buf.copy_buf.size() - 1]), priority); reduce[reduce.size() - 1] = buf.copy_buf[buf.copy_buf.size() - 1]; @@ -743,7 +750,7 @@ class CommDevice : public Comm { return stage.merged; } else { gc_->Quantize(stage.merged, &buf.compressed_send_buf[group1.size()], - &(buf.residual[buf.residual.size()-1]), priority); + &(buf.residual[buf.residual.size() - 1]), priority); CopyFromTo(buf.compressed_send_buf[group1.size()], &(buf.compressed_recv_buf[group1.size()]), priority); gc_->Dequantize(buf.compressed_recv_buf[group1.size()], From cd62a65298b918d6fadf8fd0e170607589b9b555 Mon Sep 17 00:00:00 2001 From: Leyuan Wang Date: Mon, 29 Jan 2018 19:46:47 +0000 Subject: [PATCH 7/8] test kvstore gpu debugged --- src/kvstore/comm.h | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/kvstore/comm.h b/src/kvstore/comm.h index d2b6b8bed5a9..5d94c93d9fbf 100644 --- a/src/kvstore/comm.h +++ b/src/kvstore/comm.h @@ -568,8 +568,9 @@ class CommDevice : public Comm { if (buf.merged.is_none() && stage.copy_buf.empty()) { stage.copy_buf.resize(src.size()); for (size_t j = 0; j < src.size(); ++j) - stage.copy_buf[j] = NDArray(stage.merged.shape(), stage.merged.ctx(), - true, stage.merged.dtype()); + stage.copy_buf[j] = + NDArray(stage.merged.storage_type(), stage.merged.shape(), + stage.merged.ctx(), true, stage.merged.dtype()); } reduce_s.resize(stage.copy_buf.size()); for (size_t i = 0, j = 0; i < src.size(); ++i) { From d174d78f72018dc0a2809c77012c3498905c3cc5 Mon Sep 17 00:00:00 2001 From: Leyuan Wang Date: Mon, 29 Jan 2018 23:10:18 +0000 Subject: [PATCH 8/8] stype bugs fixed --- src/kvstore/comm.h | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/src/kvstore/comm.h b/src/kvstore/comm.h index 5d94c93d9fbf..c007a78aefa3 100644 --- a/src/kvstore/comm.h +++ b/src/kvstore/comm.h @@ -544,7 +544,7 @@ class CommDevice : public Comm { BufferEntry& stage = stage_buf_[key]; std::vector reduce_s; - const NDArrayStorageType stype = buf.merged.storage_type(); + const NDArrayStorageType stype = stage.merged.storage_type(); if (stype == kDefaultStorage) { if (buf.merged.is_none() && stage.copy_buf.empty()) { stage.copy_buf.resize(src.size() - 1); @@ -557,7 +557,8 @@ class CommDevice : public Comm { int id = src[i].ctx().dev_id; if ((!buf.merged.is_none() && id == stage.merged.ctx().dev_id) || (buf.merged.is_none() && i == 0)) { - reduce_s[0] = src[i]; + CopyFromTo(src[i], &stage.merged, priority); + reduce_s[0] = stage.merged; } else if (id >= 4 || buf.merged.is_none()) { CopyFromTo(src[i], &(stage.copy_buf[j]), priority); reduce_s[j + 1] = stage.copy_buf[j]; @@ -588,8 +589,9 @@ class CommDevice : public Comm { // Main reduce result on the first group of GPUs including the partial // result from the second group if (!buf.merged.is_none()) { + const NDArrayStorageType sstype = buf.merged.storage_type(); std::vector reduce; - if (stype == kDefaultStorage) { + if (sstype == kDefaultStorage) { reduce.resize(buf.copy_buf.size() + 1); for (size_t i = 0, j = 0; i < src.size(); ++i) { int id = src[i].ctx().dev_id; @@ -898,7 +900,7 @@ class CommDevice : public Comm { }); for (auto& d : devs) { - if (d.dev_id < 4) + if (d.dev_id < NVLINK_SUPPORT) group1.push_back(d); else group2.push_back(d); @@ -974,8 +976,9 @@ class CommDevice : public Comm { if (buf.copy_buf.empty()) { buf.copy_buf.resize(group1.size() + 1); for (size_t i = 0; i < group1.size() + 1; ++i) - buf.copy_buf[i] = NDArray(buf.merged.shape(), buf.merged.ctx(), - false, buf.merged.dtype()); + buf.copy_buf[i] = + NDArray(stype, buf.merged.shape(), buf.merged.ctx(), true, + buf.merged.dtype()); } stage.merged = NDArray(stype, shape, group2[gpu1], true, type); @@ -983,7 +986,7 @@ class CommDevice : public Comm { stage.copy_buf.resize(group2.size()); for (size_t i = 0; i < group2.size(); ++i) stage.copy_buf[i] = - NDArray(stage.merged.shape(), stage.merged.ctx(), false, + NDArray(stype, stage.merged.shape(), stage.merged.ctx(), true, stage.merged.dtype()); } }